From bebb829fcd95099ed0abb41e6c45032c0886d6a9 Mon Sep 17 00:00:00 2001 From: Douglas Lee Date: Thu, 4 Jul 2024 13:39:05 +0800 Subject: [PATCH 01/25] fix(plugins/http-log): improve concurrency when max_batch_size is set to 1 --- kong/plugins/http-log/handler.lua | 37 ++++++++++++++++++++++++++++++- 1 file changed, 36 insertions(+), 1 deletion(-) diff --git a/kong/plugins/http-log/handler.lua b/kong/plugins/http-log/handler.lua index fd1d0cd48eeb..da68c5f792b3 100644 --- a/kong/plugins/http-log/handler.lua +++ b/kong/plugins/http-log/handler.lua @@ -189,12 +189,47 @@ function HttpLogHandler:log(conf) local queue_conf = Queue.get_plugin_params("http-log", conf, make_queue_name(conf)) kong.log.debug("Queue name automatically configured based on configuration parameters to: ", queue_conf.name) + local entry = cjson.encode(kong.log.serialize()) + + if queue_conf.max_batch_size == 1 then + local ok ,err = ngx.timer.at(0, function() + local retry_count = 0 + local start_time = ngx.now() + while true do + -- fixme: optmize the log as these may not appropriate at here + kong.log.debug("passing 1 entries to handler") + local ok, err = send_entries(conf, { entry }) -- fixme: should we pcall? + if ok then + kong.log.debug("handler processed 1 entries successfully") + break + end + if not ok then + kong.log.warn(string.format("handler could not process entries: %s", tostring(err or "no error details returned by handler"))) + end + + if (ngx.now() - start_time) > queue_conf.max_retry_time then + kong.log.err(string.format( + "could not send entries, giving up after %d retries. 1 queue entries were lost", + retry_count)) + break + end + + local delay = math.min(queue_conf.max_retry_delay, 2 ^ retry_count * queue_conf.initial_retry_delay) + ngx.sleep(delay) + retry_count = retry_count + 1 + end + end) + if not ok then + kong.log.err("failed to create timer: ", err) + end + return + end local ok, err = Queue.enqueue( queue_conf, send_entries, conf, - cjson.encode(kong.log.serialize()) + entry ) if not ok then kong.log.err("Failed to enqueue log entry to log server: ", err) From 390e3d387ca47d07bd1a97f22c5f52cfc28d263c Mon Sep 17 00:00:00 2001 From: Douglas Lee Date: Thu, 4 Jul 2024 13:52:50 +0800 Subject: [PATCH 02/25] add changelog --- changelog/unreleased/kong/http-log-Improvement.yml | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 changelog/unreleased/kong/http-log-Improvement.yml diff --git a/changelog/unreleased/kong/http-log-Improvement.yml b/changelog/unreleased/kong/http-log-Improvement.yml new file mode 100644 index 000000000000..fb8ab86dee2b --- /dev/null +++ b/changelog/unreleased/kong/http-log-Improvement.yml @@ -0,0 +1,4 @@ +message: | + **HTTP-Log**: Improved the concurrency of log sending when `max_batch_size` is 1.` +type: bugfix +scope: Plugin From 8bc3c12e77f4c86bebfd6876461888a690768c7c Mon Sep 17 00:00:00 2001 From: Douglas Lee Date: Thu, 4 Jul 2024 14:50:43 +0800 Subject: [PATCH 03/25] add test case --- spec/03-plugins/03-http-log/01-log_spec.lua | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/spec/03-plugins/03-http-log/01-log_spec.lua b/spec/03-plugins/03-http-log/01-log_spec.lua index fb96cb03d38e..9ff4b97e2952 100644 --- a/spec/03-plugins/03-http-log/01-log_spec.lua +++ b/spec/03-plugins/03-http-log/01-log_spec.lua @@ -92,7 +92,10 @@ for _, strategy in helpers.each_strategy() do http_endpoint = "http://" .. helpers.mock_upstream_host .. ":" .. helpers.mock_upstream_port - .. "/post_log/http_tag" + .. "/post_log/http_tag", + queue = { + max_batch_size = 2, + } } } @@ -638,6 +641,20 @@ for _, strategy in helpers.each_strategy() do admin_client:close() end) + + it("should not rely on queue when max_batch_size is 1", function() + reset_log("http") + local res = proxy_client:get("/status/200", { + headers = { + ["Host"] = "http_logging.test" + } + }) + assert.res_status(200, res) + + local entries = get_log("http", 1) + assert.same("127.0.0.1", entries[1].client_ip) + assert.logfile().has.no.line("processing queue", true) -- should not relay on queue + end) end) -- test both with a single worker for a deterministic test, From 5d10962579ffd6465923ecc8be259a24ed086c2d Mon Sep 17 00:00:00 2001 From: Douglas Lee Date: Thu, 4 Jul 2024 14:50:50 +0800 Subject: [PATCH 04/25] update --- kong/plugins/http-log/handler.lua | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/kong/plugins/http-log/handler.lua b/kong/plugins/http-log/handler.lua index da68c5f792b3..4986cb6fb27b 100644 --- a/kong/plugins/http-log/handler.lua +++ b/kong/plugins/http-log/handler.lua @@ -196,11 +196,8 @@ function HttpLogHandler:log(conf) local retry_count = 0 local start_time = ngx.now() while true do - -- fixme: optmize the log as these may not appropriate at here - kong.log.debug("passing 1 entries to handler") - local ok, err = send_entries(conf, { entry }) -- fixme: should we pcall? + local ok, err = send_entries(conf, { entry }) if ok then - kong.log.debug("handler processed 1 entries successfully") break end if not ok then From 445fcf716f411cba82d59dcdcd15f78cea1f7030 Mon Sep 17 00:00:00 2001 From: Yusheng Li Date: Thu, 4 Jul 2024 14:58:29 +0800 Subject: [PATCH 05/25] Update changelog/unreleased/kong/http-log-Improvement.yml MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Hans Hübner --- changelog/unreleased/kong/http-log-Improvement.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changelog/unreleased/kong/http-log-Improvement.yml b/changelog/unreleased/kong/http-log-Improvement.yml index fb8ab86dee2b..47fa5f52d3a7 100644 --- a/changelog/unreleased/kong/http-log-Improvement.yml +++ b/changelog/unreleased/kong/http-log-Improvement.yml @@ -1,4 +1,4 @@ message: | - **HTTP-Log**: Improved the concurrency of log sending when `max_batch_size` is 1.` + **HTTP-Log**: When `queue.max_batch_size` is 1, log entries are now sent in separate, parallel HTTP requests. Previously, they were sent sequentially in FIFO order. type: bugfix scope: Plugin From 43d9df95f3865cc482e942893ec210cbd607a831 Mon Sep 17 00:00:00 2001 From: Douglas Lee Date: Thu, 4 Jul 2024 15:31:28 +0800 Subject: [PATCH 06/25] modify changelog file name --- ...{http-log-Improvement.yml => http-log-concurrent-optimize.yml} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename changelog/unreleased/kong/{http-log-Improvement.yml => http-log-concurrent-optimize.yml} (100%) diff --git a/changelog/unreleased/kong/http-log-Improvement.yml b/changelog/unreleased/kong/http-log-concurrent-optimize.yml similarity index 100% rename from changelog/unreleased/kong/http-log-Improvement.yml rename to changelog/unreleased/kong/http-log-concurrent-optimize.yml From 41f46bf43f1c7d7526a8d3c808d4a10ce618f716 Mon Sep 17 00:00:00 2001 From: Douglas Lee Date: Thu, 4 Jul 2024 16:16:50 +0800 Subject: [PATCH 07/25] update log message --- kong/plugins/http-log/handler.lua | 5 +++-- kong/tools/queue.lua | 5 +++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/kong/plugins/http-log/handler.lua b/kong/plugins/http-log/handler.lua index 4986cb6fb27b..546a170143de 100644 --- a/kong/plugins/http-log/handler.lua +++ b/kong/plugins/http-log/handler.lua @@ -204,9 +204,10 @@ function HttpLogHandler:log(conf) kong.log.warn(string.format("handler could not process entries: %s", tostring(err or "no error details returned by handler"))) end - if (ngx.now() - start_time) > queue_conf.max_retry_time then + local spent = ngx.now() - start_time + if spent > queue_conf.max_retry_time then kong.log.err(string.format( - "could not send entries, giving up after %d retries. 1 queue entries were lost", + "could not send entries due to max_retry_time exceeded. 1 queue entries were lost", retry_count)) break end diff --git a/kong/tools/queue.lua b/kong/tools/queue.lua index dc6a8fb6a108..aeaeec104983 100644 --- a/kong/tools/queue.lua +++ b/kong/tools/queue.lua @@ -409,9 +409,10 @@ function Queue:process_once() self:log_err("handler returned falsy value but no error information") end - if (now() - start_time) > self.max_retry_time then + local spent = ngx.now() - start_time + if spent > self.max_retry_time then self:log_err( - "could not send entries, giving up after %d retries. %d queue entries were lost", + "could not send entries due to max_retry_time exceeded. %d queue entries were lost", retry_count, entry_count) break end From fcd47897ddf3a612f8a6b9e20052ce269c5ece69 Mon Sep 17 00:00:00 2001 From: Douglas Lee Date: Thu, 4 Jul 2024 17:25:10 +0800 Subject: [PATCH 08/25] update test case --- kong/tools/queue.lua | 2 +- spec/01-unit/27-queue_spec.lua | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/kong/tools/queue.lua b/kong/tools/queue.lua index aeaeec104983..6ae91fbc5cc2 100644 --- a/kong/tools/queue.lua +++ b/kong/tools/queue.lua @@ -409,7 +409,7 @@ function Queue:process_once() self:log_err("handler returned falsy value but no error information") end - local spent = ngx.now() - start_time + local spent = now() - start_time if spent > self.max_retry_time then self:log_err( "could not send entries due to max_retry_time exceeded. %d queue entries were lost", diff --git a/spec/01-unit/27-queue_spec.lua b/spec/01-unit/27-queue_spec.lua index 548be6b42bbe..5f94c9d73094 100644 --- a/spec/01-unit/27-queue_spec.lua +++ b/spec/01-unit/27-queue_spec.lua @@ -786,7 +786,7 @@ describe("plugin queue", function() assert.equal("One", processed[1]) assert.equal("Three", processed[2]) assert.match_re(log_messages, 'WARN \\[\\] queue continue-processing: handler could not process entries: .*: hard error') - assert.match_re(log_messages, 'ERR \\[\\] queue continue-processing: could not send entries, giving up after \\d retries. 1 queue entries were lost') + assert.match_re(log_messages, 'ERR \\[\\] queue continue-processing: could not send entries due to max_retry_time exceeded. \\d queue entries were lost') end) it("sanity check for function Queue.is_full() & Queue.can_enqueue()", function() From e3305caf0ddaa22900f86baa2d4627e65be11c72 Mon Sep 17 00:00:00 2001 From: Douglas Lee Date: Thu, 4 Jul 2024 18:05:52 +0800 Subject: [PATCH 09/25] why do we need to change this to 7 to make ti test passed? --- spec/01-unit/27-queue_spec.lua | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spec/01-unit/27-queue_spec.lua b/spec/01-unit/27-queue_spec.lua index 5f94c9d73094..74c9790a4fa5 100644 --- a/spec/01-unit/27-queue_spec.lua +++ b/spec/01-unit/27-queue_spec.lua @@ -342,7 +342,7 @@ describe("plugin queue", function() enqueue("another value") wait_until_queue_done("retry-give-up") assert.match_re(log_messages, 'WARN .* handler could not process entries: FAIL FAIL FAIL') - assert.match_re(log_messages, 'ERR .*1 queue entries were lost') + assert.match_re(log_messages, 'ERR .* 7 queue entries were lost') end) it("warns when queue reaches its capacity limit", function() From 6e10ce46100ce0dd8ec3f81f43d1df66a01a7af1 Mon Sep 17 00:00:00 2001 From: Douglas Lee Date: Thu, 4 Jul 2024 18:14:50 +0800 Subject: [PATCH 10/25] code style: localize --- kong/plugins/http-log/handler.lua | 20 ++++++++++++-------- kong/tools/queue.lua | 3 +-- 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/kong/plugins/http-log/handler.lua b/kong/plugins/http-log/handler.lua index 546a170143de..396d89caea82 100644 --- a/kong/plugins/http-log/handler.lua +++ b/kong/plugins/http-log/handler.lua @@ -14,6 +14,10 @@ local tonumber = tonumber local fmt = string.format local pairs = pairs local max = math.max +local min = math.min +local timer_at = ngx.timer.at +local now = ngx.now +local sleep = ngx.sleep local sandbox_opts = { env = { kong = kong, ngx = ngx } } @@ -192,28 +196,28 @@ function HttpLogHandler:log(conf) local entry = cjson.encode(kong.log.serialize()) if queue_conf.max_batch_size == 1 then - local ok ,err = ngx.timer.at(0, function() + local ok ,err = timer_at(0, function() local retry_count = 0 - local start_time = ngx.now() + local start_time = now() while true do local ok, err = send_entries(conf, { entry }) if ok then break end if not ok then - kong.log.warn(string.format("handler could not process entries: %s", tostring(err or "no error details returned by handler"))) + kong.log.warn(fmt("handler could not process entries: %s", + tostring(err or "no error details returned by handler"))) end - local spent = ngx.now() - start_time - if spent > queue_conf.max_retry_time then - kong.log.err(string.format( + if (now() - start_time) > queue_conf.max_retry_time then + kong.log.err(fmt( "could not send entries due to max_retry_time exceeded. 1 queue entries were lost", retry_count)) break end - local delay = math.min(queue_conf.max_retry_delay, 2 ^ retry_count * queue_conf.initial_retry_delay) - ngx.sleep(delay) + local delay = min(queue_conf.max_retry_delay, 2 ^ retry_count * queue_conf.initial_retry_delay) + sleep(delay) retry_count = retry_count + 1 end end) diff --git a/kong/tools/queue.lua b/kong/tools/queue.lua index 6ae91fbc5cc2..15034aed5558 100644 --- a/kong/tools/queue.lua +++ b/kong/tools/queue.lua @@ -409,8 +409,7 @@ function Queue:process_once() self:log_err("handler returned falsy value but no error information") end - local spent = now() - start_time - if spent > self.max_retry_time then + if (now() - start_time) > self.max_retry_time then self:log_err( "could not send entries due to max_retry_time exceeded. %d queue entries were lost", retry_count, entry_count) From 8baf0234b580b852e663dcf7189aef3384e6a2b4 Mon Sep 17 00:00:00 2001 From: Douglas Lee Date: Fri, 5 Jul 2024 14:08:10 +0800 Subject: [PATCH 11/25] fix test case --- kong/plugins/http-log/handler.lua | 4 +--- kong/tools/queue.lua | 2 +- spec/01-unit/27-queue_spec.lua | 2 +- 3 files changed, 3 insertions(+), 5 deletions(-) diff --git a/kong/plugins/http-log/handler.lua b/kong/plugins/http-log/handler.lua index 396d89caea82..d4226446d5dc 100644 --- a/kong/plugins/http-log/handler.lua +++ b/kong/plugins/http-log/handler.lua @@ -210,9 +210,7 @@ function HttpLogHandler:log(conf) end if (now() - start_time) > queue_conf.max_retry_time then - kong.log.err(fmt( - "could not send entries due to max_retry_time exceeded. 1 queue entries were lost", - retry_count)) + kong.log.err("could not send entries due to max_retry_time exceeded. 1 queue entries were lost") break end diff --git a/kong/tools/queue.lua b/kong/tools/queue.lua index 15034aed5558..d19f50c7b784 100644 --- a/kong/tools/queue.lua +++ b/kong/tools/queue.lua @@ -412,7 +412,7 @@ function Queue:process_once() if (now() - start_time) > self.max_retry_time then self:log_err( "could not send entries due to max_retry_time exceeded. %d queue entries were lost", - retry_count, entry_count) + entry_count) break end diff --git a/spec/01-unit/27-queue_spec.lua b/spec/01-unit/27-queue_spec.lua index 74c9790a4fa5..5f94c9d73094 100644 --- a/spec/01-unit/27-queue_spec.lua +++ b/spec/01-unit/27-queue_spec.lua @@ -342,7 +342,7 @@ describe("plugin queue", function() enqueue("another value") wait_until_queue_done("retry-give-up") assert.match_re(log_messages, 'WARN .* handler could not process entries: FAIL FAIL FAIL') - assert.match_re(log_messages, 'ERR .* 7 queue entries were lost') + assert.match_re(log_messages, 'ERR .*1 queue entries were lost') end) it("warns when queue reaches its capacity limit", function() From 9fb05bfab5d33b82197b9cbfa2be71c8e6b78a5a Mon Sep 17 00:00:00 2001 From: Yusheng Li Date: Fri, 5 Jul 2024 14:31:39 +0800 Subject: [PATCH 12/25] Update spec/03-plugins/03-http-log/01-log_spec.lua MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Hans Hübner --- spec/03-plugins/03-http-log/01-log_spec.lua | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spec/03-plugins/03-http-log/01-log_spec.lua b/spec/03-plugins/03-http-log/01-log_spec.lua index 9ff4b97e2952..4b2bdd8bb18f 100644 --- a/spec/03-plugins/03-http-log/01-log_spec.lua +++ b/spec/03-plugins/03-http-log/01-log_spec.lua @@ -642,7 +642,7 @@ for _, strategy in helpers.each_strategy() do admin_client:close() end) - it("should not rely on queue when max_batch_size is 1", function() + it("should not use queue when max_batch_size is 1", function() reset_log("http") local res = proxy_client:get("/status/200", { headers = { From 0ed73651460c77a06fbcb8bf5bd1084c64d262c1 Mon Sep 17 00:00:00 2001 From: Yusheng Li Date: Fri, 5 Jul 2024 14:31:45 +0800 Subject: [PATCH 13/25] Update spec/03-plugins/03-http-log/01-log_spec.lua MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Hans Hübner --- spec/03-plugins/03-http-log/01-log_spec.lua | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spec/03-plugins/03-http-log/01-log_spec.lua b/spec/03-plugins/03-http-log/01-log_spec.lua index 4b2bdd8bb18f..5f1a19ac4a75 100644 --- a/spec/03-plugins/03-http-log/01-log_spec.lua +++ b/spec/03-plugins/03-http-log/01-log_spec.lua @@ -653,7 +653,7 @@ for _, strategy in helpers.each_strategy() do local entries = get_log("http", 1) assert.same("127.0.0.1", entries[1].client_ip) - assert.logfile().has.no.line("processing queue", true) -- should not relay on queue + assert.logfile().has.no.line("processing queue", true) -- should not use queue end) end) From 51ca784e7f892b4138664ec23ba11be32e7fc42b Mon Sep 17 00:00:00 2001 From: Douglas Lee Date: Fri, 5 Jul 2024 14:33:42 +0800 Subject: [PATCH 14/25] update message --- kong/plugins/http-log/handler.lua | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kong/plugins/http-log/handler.lua b/kong/plugins/http-log/handler.lua index d4226446d5dc..22ae29c1b331 100644 --- a/kong/plugins/http-log/handler.lua +++ b/kong/plugins/http-log/handler.lua @@ -210,7 +210,7 @@ function HttpLogHandler:log(conf) end if (now() - start_time) > queue_conf.max_retry_time then - kong.log.err("could not send entries due to max_retry_time exceeded. 1 queue entries were lost") + kong.log.err("could not send entries due to max_retry_time exceeded. 1 queue entry was lost") break end From 368b0be04a07e5bb0fc146c27e07a626168cb80e Mon Sep 17 00:00:00 2001 From: Douglas Lee Date: Wed, 17 Jul 2024 14:54:36 +0800 Subject: [PATCH 15/25] fix(plugins/http-log): improve concurrency when max_batch_size is set to 1 --- kong/plugins/http-log/handler.lua | 29 +---- kong/tools/queue.lua | 206 ++++++++++++++++-------------- 2 files changed, 118 insertions(+), 117 deletions(-) diff --git a/kong/plugins/http-log/handler.lua b/kong/plugins/http-log/handler.lua index 22ae29c1b331..382a0654335b 100644 --- a/kong/plugins/http-log/handler.lua +++ b/kong/plugins/http-log/handler.lua @@ -14,10 +14,7 @@ local tonumber = tonumber local fmt = string.format local pairs = pairs local max = math.max -local min = math.min local timer_at = ngx.timer.at -local now = ngx.now -local sleep = ngx.sleep local sandbox_opts = { env = { kong = kong, ngx = ngx } } @@ -196,28 +193,12 @@ function HttpLogHandler:log(conf) local entry = cjson.encode(kong.log.serialize()) if queue_conf.max_batch_size == 1 then - local ok ,err = timer_at(0, function() - local retry_count = 0 - local start_time = now() - while true do - local ok, err = send_entries(conf, { entry }) - if ok then - break - end - if not ok then - kong.log.warn(fmt("handler could not process entries: %s", - tostring(err or "no error details returned by handler"))) - end - - if (now() - start_time) > queue_conf.max_retry_time then - kong.log.err("could not send entries due to max_retry_time exceeded. 1 queue entry was lost") - break - end - - local delay = min(queue_conf.max_retry_delay, 2 ^ retry_count * queue_conf.initial_retry_delay) - sleep(delay) - retry_count = retry_count + 1 + local queue = Queue.create(queue_conf, send_entries, conf) + local ok, err = timer_at(0, function(premature) + if premature then + return end + queue:handle({ entry }) end) if not ok then kong.log.err("failed to create timer: ", err) diff --git a/kong/tools/queue.lua b/kong/tools/queue.lua index d19f50c7b784..8f45ca24ee6b 100644 --- a/kong/tools/queue.lua +++ b/kong/tools/queue.lua @@ -203,6 +203,14 @@ end -- @param opts table, requires `name`, optionally includes `retry_count`, `max_coalescing_delay` and `max_batch_size` -- @return table: a Queue object. local function get_or_create_queue(queue_conf, handler, handler_conf) + assert(type(queue_conf) == "table", + "arg #1 (queue_conf) must be a table") + assert(type(handler) == "function", + "arg #2 (handler) must be a function") + assert(handler_conf == nil or type(handler_conf) == "table", + "arg #3 (handler_conf) must be a table or nil") + assert(type(queue_conf.name) == "string", + "arg #1 (queue_conf) must include a name") local name = assert(queue_conf.name) local key = _make_queue_key(name) @@ -215,7 +223,70 @@ local function get_or_create_queue(queue_conf, handler, handler_conf) return queue end - queue = { + queue = Queue.create(queue_conf, handler, handler_conf) + + kong.timer:named_at("queue " .. key, 0, function(_, q) + while q:count() > 0 do + q:log_debug("processing queue") + q:process_once() + end + q:log_debug("done processing queue") + queues[key] = nil + end, queue) + + queues[key] = queue + + queue:log_debug("queue created") + + return queue +end + +function Queue.create(queue_conf, handler, handler_conf) + assert(type(queue_conf) == "table", + "arg #1 (queue_conf) must be a table") + assert(type(handler) == "function", + "arg #2 (handler) must be a function") + assert(handler_conf == nil or type(handler_conf) == "table", + "arg #3 (handler_conf) must be a table or nil") + assert(type(queue_conf.name) == "string", + "arg #1 (queue_conf) must include a name") + + + assert( + type(queue_conf.max_batch_size) == "number", + "arg #1 (queue_conf) max_batch_size must be a number" + ) + assert( + type(queue_conf.max_coalescing_delay) == "number", + "arg #1 (queue_conf) max_coalescing_delay must be a number" + ) + assert( + type(queue_conf.max_entries) == "number", + "arg #1 (queue_conf) max_entries must be a number" + ) + assert( + type(queue_conf.max_retry_time) == "number", + "arg #1 (queue_conf) max_retry_time must be a number" + ) + assert( + type(queue_conf.initial_retry_delay) == "number", + "arg #1 (queue_conf) initial_retry_delay must be a number" + ) + assert( + type(queue_conf.max_retry_delay) == "number", + "arg #1 (queue_conf) max_retry_delay must be a number" + ) + + local max_bytes_type = type(queue_conf.max_bytes) + assert( + max_bytes_type == "nil" or max_bytes_type == "number", + "arg #1 (queue_conf) max_bytes must be a number or nil" + ) + + local name = assert(queue_conf.name) + local key = _make_queue_key(name) + + local queue = { -- Queue parameters from the enqueue call name = name, key = key, @@ -238,22 +309,7 @@ local function get_or_create_queue(queue_conf, handler, handler_conf) queue[option] = value end - queue = setmetatable(queue, Queue_mt) - - kong.timer:named_at("queue " .. key, 0, function(_, q) - while q:count() > 0 do - q:log_debug("processing queue") - q:process_once() - end - q:log_debug("done processing queue") - queues[key] = nil - end, queue) - - queues[key] = queue - - queue:log_debug("queue created") - - return queue + return setmetatable(queue, Queue_mt) end @@ -338,6 +394,45 @@ function Queue:drop_oldest_entry() self:delete_frontmost_entry() end +function Queue:handle(entries) + local entry_count = #entries + + local start_time = now() + local retry_count = 0 + while true do + self:log_debug("passing %d entries to handler", entry_count) + local status, ok, err = pcall(self.handler, self.handler_conf, entries) + if status and ok == true then + self:log_debug("handler processed %d entries successfully", entry_count) + break + end + + if not status then + -- protected call failed, ok is the error message + err = ok + end + + self:log_warn("handler could not process entries: %s", tostring(err or "no error details returned by handler")) + + if not err then + self:log_err("handler returned falsy value but no error information") + end + + if (now() - start_time) > self.max_retry_time then + self:log_err( + "could not send entries due to max_retry_time exceeded. %d queue entries were lost", + entry_count) + break + end + + -- Delay before retrying. The delay time is calculated by multiplying the configured initial_retry_delay with + -- 2 to the power of the number of retries, creating an exponential increase over the course of each retry. + -- The maximum time between retries is capped by the max_retry_delay configuration parameter. + sleep(math_min(self.max_retry_delay, 2 ^ retry_count * self.initial_retry_delay)) + retry_count = retry_count + 1 + end +end + -- Process one batch of entries from the queue. Returns truthy if entries were processed, falsy if there was an -- error or no items were on the queue to be processed. @@ -387,41 +482,7 @@ function Queue:process_once() self.already_dropped_entries = false end - local start_time = now() - local retry_count = 0 - while true do - self:log_debug("passing %d entries to handler", entry_count) - local status - status, ok, err = pcall(self.handler, self.handler_conf, batch) - if status and ok == true then - self:log_debug("handler processed %d entries successfully", entry_count) - break - end - - if not status then - -- protected call failed, ok is the error message - err = ok - end - - self:log_warn("handler could not process entries: %s", tostring(err or "no error details returned by handler")) - - if not err then - self:log_err("handler returned falsy value but no error information") - end - - if (now() - start_time) > self.max_retry_time then - self:log_err( - "could not send entries due to max_retry_time exceeded. %d queue entries were lost", - entry_count) - break - end - - -- Delay before retrying. The delay time is calculated by multiplying the configured initial_retry_delay with - -- 2 to the power of the number of retries, creating an exponential increase over the course of each retry. - -- The maximum time between retries is capped by the max_retry_delay configuration parameter. - sleep(math_min(self.max_retry_delay, 2 ^ retry_count * self.initial_retry_delay)) - retry_count = retry_count + 1 - end + self:handle(batch) end @@ -574,47 +635,6 @@ end function Queue.enqueue(queue_conf, handler, handler_conf, value) - - assert(type(queue_conf) == "table", - "arg #1 (queue_conf) must be a table") - assert(type(handler) == "function", - "arg #2 (handler) must be a function") - assert(handler_conf == nil or type(handler_conf) == "table", - "arg #3 (handler_conf) must be a table or nil") - assert(type(queue_conf.name) == "string", - "arg #1 (queue_conf) must include a name") - - assert( - type(queue_conf.max_batch_size) == "number", - "arg #1 (queue_conf) max_batch_size must be a number" - ) - assert( - type(queue_conf.max_coalescing_delay) == "number", - "arg #1 (queue_conf) max_coalescing_delay must be a number" - ) - assert( - type(queue_conf.max_entries) == "number", - "arg #1 (queue_conf) max_entries must be a number" - ) - assert( - type(queue_conf.max_retry_time) == "number", - "arg #1 (queue_conf) max_retry_time must be a number" - ) - assert( - type(queue_conf.initial_retry_delay) == "number", - "arg #1 (queue_conf) initial_retry_delay must be a number" - ) - assert( - type(queue_conf.max_retry_delay) == "number", - "arg #1 (queue_conf) max_retry_delay must be a number" - ) - - local max_bytes_type = type(queue_conf.max_bytes) - assert( - max_bytes_type == "nil" or max_bytes_type == "number", - "arg #1 (queue_conf) max_bytes must be a number or nil" - ) - local queue = get_or_create_queue(queue_conf, handler, handler_conf) return enqueue(queue, value) end From 4683d93d17065bd14be64182681508c6bc929da9 Mon Sep 17 00:00:00 2001 From: Douglas Lee Date: Wed, 7 Aug 2024 13:44:27 +0800 Subject: [PATCH 16/25] feature(plugins/http-log): added a new configuration `no_queue` to indicates whether to disable the queue mechanism. --- changelog/unreleased/kong/http-log-concurrent-optimize.yml | 5 ++--- kong/clustering/compat/removed_fields.lua | 3 +++ kong/plugins/http-log/handler.lua | 2 +- kong/plugins/http-log/schema.lua | 1 + 4 files changed, 7 insertions(+), 4 deletions(-) diff --git a/changelog/unreleased/kong/http-log-concurrent-optimize.yml b/changelog/unreleased/kong/http-log-concurrent-optimize.yml index 47fa5f52d3a7..e6ca208f6e2e 100644 --- a/changelog/unreleased/kong/http-log-concurrent-optimize.yml +++ b/changelog/unreleased/kong/http-log-concurrent-optimize.yml @@ -1,4 +1,3 @@ -message: | - **HTTP-Log**: When `queue.max_batch_size` is 1, log entries are now sent in separate, parallel HTTP requests. Previously, they were sent sequentially in FIFO order. -type: bugfix +message: "**HTTP-Log**: Added a new configuration `no_queue` to indicates whether to disable the queue mechanism." +type: feature scope: Plugin diff --git a/kong/clustering/compat/removed_fields.lua b/kong/clustering/compat/removed_fields.lua index 5c1b7404fe8a..7f5f1fadb49f 100644 --- a/kong/clustering/compat/removed_fields.lua +++ b/kong/clustering/compat/removed_fields.lua @@ -210,5 +210,8 @@ return { acl = { "always_use_authenticated_groups", }, + http_log = { + "no_queue" + }, }, } diff --git a/kong/plugins/http-log/handler.lua b/kong/plugins/http-log/handler.lua index 382a0654335b..c267ddc7de3c 100644 --- a/kong/plugins/http-log/handler.lua +++ b/kong/plugins/http-log/handler.lua @@ -192,7 +192,7 @@ function HttpLogHandler:log(conf) kong.log.debug("Queue name automatically configured based on configuration parameters to: ", queue_conf.name) local entry = cjson.encode(kong.log.serialize()) - if queue_conf.max_batch_size == 1 then + if conf.no_queue then local queue = Queue.create(queue_conf, send_entries, conf) local ok, err = timer_at(0, function(premature) if premature then diff --git a/kong/plugins/http-log/schema.lua b/kong/plugins/http-log/schema.lua index 430761a5ed41..8db7219ef607 100644 --- a/kong/plugins/http-log/schema.lua +++ b/kong/plugins/http-log/schema.lua @@ -59,6 +59,7 @@ return { }}, { queue = typedefs.queue }, { custom_fields_by_lua = typedefs.lua_code }, + { no_queue = { description = "Indicates whether to disable the queue mechanism.", type = "boolean", default = false, required = true }, }, }, custom_validator = function(config) -- check no double userinfo + authorization header From 0914b62aaf498e5f5d0eb31fec011572681a8ca4 Mon Sep 17 00:00:00 2001 From: Douglas Lee Date: Wed, 7 Aug 2024 13:42:07 +0800 Subject: [PATCH 17/25] feature(plugins/http-log): added a new configuration `no_queue` to indicates whether to disable the queue mechanism. --- spec/03-plugins/03-http-log/01-log_spec.lua | 26 ++++++++++++++++----- 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/spec/03-plugins/03-http-log/01-log_spec.lua b/spec/03-plugins/03-http-log/01-log_spec.lua index 5f1a19ac4a75..c7a1a8996193 100644 --- a/spec/03-plugins/03-http-log/01-log_spec.lua +++ b/spec/03-plugins/03-http-log/01-log_spec.lua @@ -92,10 +92,7 @@ for _, strategy in helpers.each_strategy() do http_endpoint = "http://" .. helpers.mock_upstream_host .. ":" .. helpers.mock_upstream_port - .. "/post_log/http_tag", - queue = { - max_batch_size = 2, - } + .. "/post_log/http_tag" } } @@ -341,6 +338,23 @@ for _, strategy in helpers.each_strategy() do } } + local route1_4 = bp.routes:insert { + hosts = { "no_queue.test" }, + service = service1 + } + + bp.plugins:insert { + route = { id = route1_4.id }, + name = "http-log", + config = { + http_endpoint = "http://" .. helpers.mock_upstream_host + .. ":" + .. helpers.mock_upstream_port + .. "/post_log/http", + no_queue = true, + } + } + helpers.setenv(vault_env_name, vault_env_value) assert(helpers.start_kong({ @@ -642,11 +656,11 @@ for _, strategy in helpers.each_strategy() do admin_client:close() end) - it("should not use queue when max_batch_size is 1", function() + it("should not use queue when no_queue is true", function() reset_log("http") local res = proxy_client:get("/status/200", { headers = { - ["Host"] = "http_logging.test" + ["Host"] = "no_queue.test" } }) assert.res_status(200, res) From 86482ba93700f029a7d23b8d55d97011ab7adac2 Mon Sep 17 00:00:00 2001 From: Douglas Lee Date: Wed, 7 Aug 2024 14:28:01 +0800 Subject: [PATCH 18/25] update test cases --- .../11-declarative_config/02-process_auto_fields_spec.lua | 4 ++++ .../01-schema/11-declarative_config/03-flatten_spec.lua | 6 +++++- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/spec/01-unit/01-db/01-schema/11-declarative_config/02-process_auto_fields_spec.lua b/spec/01-unit/01-db/01-schema/11-declarative_config/02-process_auto_fields_spec.lua index f12359a8aa5a..38ee81a544ac 100644 --- a/spec/01-unit/01-db/01-schema/11-declarative_config/02-process_auto_fields_spec.lua +++ b/spec/01-unit/01-db/01-schema/11-declarative_config/02-process_auto_fields_spec.lua @@ -180,6 +180,7 @@ describe("declarative config: process_auto_fields", function() max_retry_delay = 60, max_retry_time = 60, }, + no_queue = false, } }, } @@ -237,6 +238,7 @@ describe("declarative config: process_auto_fields", function() max_retry_delay = 60, max_retry_time = 60, }, + no_queue = false, } }, } @@ -354,6 +356,7 @@ describe("declarative config: process_auto_fields", function() max_retry_delay = 60, max_retry_time = 60, }, + no_queue = false, } }, } @@ -675,6 +678,7 @@ describe("declarative config: process_auto_fields", function() max_retry_delay = 60, max_retry_time = 60, }, + no_queue = false, } } } diff --git a/spec/01-unit/01-db/01-schema/11-declarative_config/03-flatten_spec.lua b/spec/01-unit/01-db/01-schema/11-declarative_config/03-flatten_spec.lua index a869822e847e..2df84438e2d8 100644 --- a/spec/01-unit/01-db/01-schema/11-declarative_config/03-flatten_spec.lua +++ b/spec/01-unit/01-db/01-schema/11-declarative_config/03-flatten_spec.lua @@ -303,6 +303,7 @@ describe("declarative config: flatten", function() max_retry_time = 60, max_bytes = null, }, + no_queue = false, } }, { @@ -410,6 +411,7 @@ describe("declarative config: flatten", function() max_retry_time = 60, max_bytes = null, }, + no_queue = false, }, consumer = { id = "UUID" @@ -611,7 +613,8 @@ describe("declarative config: flatten", function() max_retry_delay = 60, max_retry_time = 60, max_bytes = null, - } + }, + no_queue = false, }, consumer = null, created_at = 1234567890, @@ -1129,6 +1132,7 @@ describe("declarative config: flatten", function() max_retry_time = 60, max_bytes = null, }, + no_queue = false, }, consumer = null, created_at = 1234567890, From 6b9216250c75b712ceeba7f7fd876c919e9a96f9 Mon Sep 17 00:00:00 2001 From: Douglas Lee Date: Fri, 9 Aug 2024 14:47:33 +0800 Subject: [PATCH 19/25] feat(queue): add concurrency --- .../kong/http-log-concurrent-optimize.yml | 2 +- kong/clustering/compat/removed_fields.lua | 12 +- kong/plugins/http-log/handler.lua | 18 +- kong/plugins/http-log/schema.lua | 1 - kong/tools/queue.lua | 201 +++++++++--------- kong/tools/queue_schema.lua | 8 + .../02-process_auto_fields_spec.lua | 8 +- .../11-declarative_config/03-flatten_spec.lua | 8 +- spec/03-plugins/03-http-log/01-log_spec.lua | 4 +- 9 files changed, 134 insertions(+), 128 deletions(-) diff --git a/changelog/unreleased/kong/http-log-concurrent-optimize.yml b/changelog/unreleased/kong/http-log-concurrent-optimize.yml index e6ca208f6e2e..71e5de84e64f 100644 --- a/changelog/unreleased/kong/http-log-concurrent-optimize.yml +++ b/changelog/unreleased/kong/http-log-concurrent-optimize.yml @@ -1,3 +1,3 @@ -message: "**HTTP-Log**: Added a new configuration `no_queue` to indicates whether to disable the queue mechanism." +message: "**HTTP-Log**: Added a new configuration `queue.concurrency`" type: feature scope: Plugin diff --git a/kong/clustering/compat/removed_fields.lua b/kong/clustering/compat/removed_fields.lua index 7f5f1fadb49f..94bdc42de5e9 100644 --- a/kong/clustering/compat/removed_fields.lua +++ b/kong/clustering/compat/removed_fields.lua @@ -166,6 +166,7 @@ return { opentelemetry = { "traces_endpoint", "logs_endpoint", + "queue.concurrency", }, ai_proxy = { "max_request_body_size", @@ -211,7 +212,16 @@ return { "always_use_authenticated_groups", }, http_log = { - "no_queue" + "queue.concurrency", + }, + statsd = { + "queue.concurrency", + }, + datadog = { + "queue.concurrency", + }, + zipkin = { + "queue.concurrency", }, }, } diff --git a/kong/plugins/http-log/handler.lua b/kong/plugins/http-log/handler.lua index c267ddc7de3c..fd1d0cd48eeb 100644 --- a/kong/plugins/http-log/handler.lua +++ b/kong/plugins/http-log/handler.lua @@ -14,7 +14,6 @@ local tonumber = tonumber local fmt = string.format local pairs = pairs local max = math.max -local timer_at = ngx.timer.at local sandbox_opts = { env = { kong = kong, ngx = ngx } } @@ -190,27 +189,12 @@ function HttpLogHandler:log(conf) local queue_conf = Queue.get_plugin_params("http-log", conf, make_queue_name(conf)) kong.log.debug("Queue name automatically configured based on configuration parameters to: ", queue_conf.name) - local entry = cjson.encode(kong.log.serialize()) - - if conf.no_queue then - local queue = Queue.create(queue_conf, send_entries, conf) - local ok, err = timer_at(0, function(premature) - if premature then - return - end - queue:handle({ entry }) - end) - if not ok then - kong.log.err("failed to create timer: ", err) - end - return - end local ok, err = Queue.enqueue( queue_conf, send_entries, conf, - entry + cjson.encode(kong.log.serialize()) ) if not ok then kong.log.err("Failed to enqueue log entry to log server: ", err) diff --git a/kong/plugins/http-log/schema.lua b/kong/plugins/http-log/schema.lua index 8db7219ef607..430761a5ed41 100644 --- a/kong/plugins/http-log/schema.lua +++ b/kong/plugins/http-log/schema.lua @@ -59,7 +59,6 @@ return { }}, { queue = typedefs.queue }, { custom_fields_by_lua = typedefs.lua_code }, - { no_queue = { description = "Indicates whether to disable the queue mechanism.", type = "boolean", default = false, required = true }, }, }, custom_validator = function(config) -- check no double userinfo + authorization header diff --git a/kong/tools/queue.lua b/kong/tools/queue.lua index 8f45ca24ee6b..37a95ff39c7a 100644 --- a/kong/tools/queue.lua +++ b/kong/tools/queue.lua @@ -76,6 +76,7 @@ local math_min = math.min local now = ngx.now local sleep = ngx.sleep local null = ngx.null +local timer_at = ngx.timer.at local Queue = {} @@ -203,14 +204,6 @@ end -- @param opts table, requires `name`, optionally includes `retry_count`, `max_coalescing_delay` and `max_batch_size` -- @return table: a Queue object. local function get_or_create_queue(queue_conf, handler, handler_conf) - assert(type(queue_conf) == "table", - "arg #1 (queue_conf) must be a table") - assert(type(handler) == "function", - "arg #2 (handler) must be a function") - assert(handler_conf == nil or type(handler_conf) == "table", - "arg #3 (handler_conf) must be a table or nil") - assert(type(queue_conf.name) == "string", - "arg #1 (queue_conf) must include a name") local name = assert(queue_conf.name) local key = _make_queue_key(name) @@ -223,70 +216,7 @@ local function get_or_create_queue(queue_conf, handler, handler_conf) return queue end - queue = Queue.create(queue_conf, handler, handler_conf) - - kong.timer:named_at("queue " .. key, 0, function(_, q) - while q:count() > 0 do - q:log_debug("processing queue") - q:process_once() - end - q:log_debug("done processing queue") - queues[key] = nil - end, queue) - - queues[key] = queue - - queue:log_debug("queue created") - - return queue -end - -function Queue.create(queue_conf, handler, handler_conf) - assert(type(queue_conf) == "table", - "arg #1 (queue_conf) must be a table") - assert(type(handler) == "function", - "arg #2 (handler) must be a function") - assert(handler_conf == nil or type(handler_conf) == "table", - "arg #3 (handler_conf) must be a table or nil") - assert(type(queue_conf.name) == "string", - "arg #1 (queue_conf) must include a name") - - - assert( - type(queue_conf.max_batch_size) == "number", - "arg #1 (queue_conf) max_batch_size must be a number" - ) - assert( - type(queue_conf.max_coalescing_delay) == "number", - "arg #1 (queue_conf) max_coalescing_delay must be a number" - ) - assert( - type(queue_conf.max_entries) == "number", - "arg #1 (queue_conf) max_entries must be a number" - ) - assert( - type(queue_conf.max_retry_time) == "number", - "arg #1 (queue_conf) max_retry_time must be a number" - ) - assert( - type(queue_conf.initial_retry_delay) == "number", - "arg #1 (queue_conf) initial_retry_delay must be a number" - ) - assert( - type(queue_conf.max_retry_delay) == "number", - "arg #1 (queue_conf) max_retry_delay must be a number" - ) - - local max_bytes_type = type(queue_conf.max_bytes) - assert( - max_bytes_type == "nil" or max_bytes_type == "number", - "arg #1 (queue_conf) max_bytes must be a number or nil" - ) - - local name = assert(queue_conf.name) - local key = _make_queue_key(name) - - local queue = { + queue = { -- Queue parameters from the enqueue call name = name, key = key, @@ -309,7 +239,22 @@ function Queue.create(queue_conf, handler, handler_conf) queue[option] = value end - return setmetatable(queue, Queue_mt) + queue = setmetatable(queue, Queue_mt) + + kong.timer:named_at("queue " .. key, 0, function(_, q) + while q:count() > 0 do + q:log_debug("processing queue") + q:process_once() + end + q:log_debug("done processing queue") + queues[key] = nil + end, queue) + + queues[key] = queue + + queue:log_debug("queue created") + + return queue end @@ -370,31 +315,7 @@ function Queue.can_enqueue(queue_conf, entry) return _can_enqueue(queue, entry) end - --- Delete the frontmost entry from the queue and adjust the current utilization variables. -function Queue:delete_frontmost_entry() - if self.max_bytes then - -- If max_bytes is set, reduce the currently queued byte count by the - self.bytes_queued = self.bytes_queued - #self.entries[self.front] - end - self.entries[self.front] = nil - self.front = self.front + 1 - if self.front == self.back then - self.front = 1 - self.back = 1 - end -end - - --- Drop the oldest entry, adjusting the semaphore value in the process. This is --- called when the queue runs out of space and needs to make space. -function Queue:drop_oldest_entry() - assert(self.semaphore:count() > 0) - self.semaphore:wait(0) - self:delete_frontmost_entry() -end - -function Queue:handle(entries) +local function handle(self, entries) local entry_count = #entries local start_time = now() @@ -434,6 +355,30 @@ function Queue:handle(entries) end +-- Delete the frontmost entry from the queue and adjust the current utilization variables. +function Queue:delete_frontmost_entry() + if self.max_bytes then + -- If max_bytes is set, reduce the currently queued byte count by the + self.bytes_queued = self.bytes_queued - #self.entries[self.front] + end + self.entries[self.front] = nil + self.front = self.front + 1 + if self.front == self.back then + self.front = 1 + self.back = 1 + end +end + + +-- Drop the oldest entry, adjusting the semaphore value in the process. This is +-- called when the queue runs out of space and needs to make space. +function Queue:drop_oldest_entry() + assert(self.semaphore:count() > 0) + self.semaphore:wait(0) + self:delete_frontmost_entry() +end + + -- Process one batch of entries from the queue. Returns truthy if entries were processed, falsy if there was an -- error or no items were on the queue to be processed. function Queue:process_once() @@ -482,7 +427,7 @@ function Queue:process_once() self.already_dropped_entries = false end - self:handle(batch) + handle(self, batch) end @@ -567,6 +512,21 @@ local function enqueue(self, entry) return nil, "entry must be a non-nil Lua value" end + + if self.concurrency == 0 then + local ok, err = timer_at(0, function(premature) + if premature then + return + end + handle(self, { entry }) + end) + if not ok then + return nil, "failed to crete timer: " .. err + end + return true + end + + if self:count() >= self.max_entries * CAPACITY_WARNING_THRESHOLD then if not self.warned then self:log_warn('queue at %s%% capacity', CAPACITY_WARNING_THRESHOLD * 100) @@ -635,6 +595,49 @@ end function Queue.enqueue(queue_conf, handler, handler_conf, value) + + assert(type(queue_conf) == "table", + "arg #1 (queue_conf) must be a table") + assert(type(handler) == "function", + "arg #2 (handler) must be a function") + assert(handler_conf == nil or type(handler_conf) == "table", + "arg #3 (handler_conf) must be a table or nil") + assert(type(queue_conf.name) == "string", + "arg #1 (queue_conf) must include a name") + + assert( + type(queue_conf.max_batch_size) == "number", + "arg #1 (queue_conf) max_batch_size must be a number" + ) + assert( + type(queue_conf.max_coalescing_delay) == "number", + "arg #1 (queue_conf) max_coalescing_delay must be a number" + ) + assert( + type(queue_conf.max_entries) == "number", + "arg #1 (queue_conf) max_entries must be a number" + ) + assert( + type(queue_conf.max_retry_time) == "number", + "arg #1 (queue_conf) max_retry_time must be a number" + ) + assert( + type(queue_conf.initial_retry_delay) == "number", + "arg #1 (queue_conf) initial_retry_delay must be a number" + ) + assert( + type(queue_conf.max_retry_delay) == "number", + "arg #1 (queue_conf) max_retry_delay must be a number" + ) + + local max_bytes_type = type(queue_conf.max_bytes) + assert( + max_bytes_type == "nil" or max_bytes_type == "number", + "arg #1 (queue_conf) max_bytes must be a number or nil" + ) + + -- TODO: assert concurrency + local queue = get_or_create_queue(queue_conf, handler, handler_conf) return enqueue(queue, value) end diff --git a/kong/tools/queue_schema.lua b/kong/tools/queue_schema.lua index 94132ed21b5f..01720297f07a 100644 --- a/kong/tools/queue_schema.lua +++ b/kong/tools/queue_schema.lua @@ -49,5 +49,13 @@ return Schema.define { between = { 0.001, 1000000 }, -- effectively unlimited maximum description = "Maximum time in seconds between retries, caps exponential backoff." } }, + { concurrency = { + type = "integer", + default = 1, + between = { 0, 1 }, + required = true, + description = "TBD11111111" + } }, + } } diff --git a/spec/01-unit/01-db/01-schema/11-declarative_config/02-process_auto_fields_spec.lua b/spec/01-unit/01-db/01-schema/11-declarative_config/02-process_auto_fields_spec.lua index 38ee81a544ac..5a65cce5c9cc 100644 --- a/spec/01-unit/01-db/01-schema/11-declarative_config/02-process_auto_fields_spec.lua +++ b/spec/01-unit/01-db/01-schema/11-declarative_config/02-process_auto_fields_spec.lua @@ -179,8 +179,8 @@ describe("declarative config: process_auto_fields", function() max_coalescing_delay = 1, max_retry_delay = 60, max_retry_time = 60, + concurrency = 1, }, - no_queue = false, } }, } @@ -237,8 +237,8 @@ describe("declarative config: process_auto_fields", function() max_coalescing_delay = 1, max_retry_delay = 60, max_retry_time = 60, + concurrency = 1, }, - no_queue = false, } }, } @@ -355,8 +355,8 @@ describe("declarative config: process_auto_fields", function() max_coalescing_delay = 1, max_retry_delay = 60, max_retry_time = 60, + concurrency = 1, }, - no_queue = false, } }, } @@ -677,8 +677,8 @@ describe("declarative config: process_auto_fields", function() max_coalescing_delay = 1, max_retry_delay = 60, max_retry_time = 60, + concurrency = 1, }, - no_queue = false, } } } diff --git a/spec/01-unit/01-db/01-schema/11-declarative_config/03-flatten_spec.lua b/spec/01-unit/01-db/01-schema/11-declarative_config/03-flatten_spec.lua index 2df84438e2d8..6647714b880e 100644 --- a/spec/01-unit/01-db/01-schema/11-declarative_config/03-flatten_spec.lua +++ b/spec/01-unit/01-db/01-schema/11-declarative_config/03-flatten_spec.lua @@ -302,8 +302,8 @@ describe("declarative config: flatten", function() max_retry_delay = 60, max_retry_time = 60, max_bytes = null, + concurrency = 1, }, - no_queue = false, } }, { @@ -410,8 +410,8 @@ describe("declarative config: flatten", function() max_retry_delay = 60, max_retry_time = 60, max_bytes = null, + concurrency = 1, }, - no_queue = false, }, consumer = { id = "UUID" @@ -613,8 +613,8 @@ describe("declarative config: flatten", function() max_retry_delay = 60, max_retry_time = 60, max_bytes = null, + concurrency = 1, }, - no_queue = false, }, consumer = null, created_at = 1234567890, @@ -1131,8 +1131,8 @@ describe("declarative config: flatten", function() max_retry_delay = 60, max_retry_time = 60, max_bytes = null, + concurrency = 1, }, - no_queue = false, }, consumer = null, created_at = 1234567890, diff --git a/spec/03-plugins/03-http-log/01-log_spec.lua b/spec/03-plugins/03-http-log/01-log_spec.lua index c7a1a8996193..2175e31a1f48 100644 --- a/spec/03-plugins/03-http-log/01-log_spec.lua +++ b/spec/03-plugins/03-http-log/01-log_spec.lua @@ -351,7 +351,9 @@ for _, strategy in helpers.each_strategy() do .. ":" .. helpers.mock_upstream_port .. "/post_log/http", - no_queue = true, + queue = { + concurrency = 0, + }, } } From 744c5e6fafc0b1c077c4fcac64e5888dd3c25c4d Mon Sep 17 00:00:00 2001 From: Douglas Lee Date: Fri, 9 Aug 2024 18:08:07 +0800 Subject: [PATCH 20/25] fix: only cache queue when concurrency is 1 --- kong/tools/queue.lua | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/kong/tools/queue.lua b/kong/tools/queue.lua index 37a95ff39c7a..5a17e056649c 100644 --- a/kong/tools/queue.lua +++ b/kong/tools/queue.lua @@ -241,16 +241,18 @@ local function get_or_create_queue(queue_conf, handler, handler_conf) queue = setmetatable(queue, Queue_mt) - kong.timer:named_at("queue " .. key, 0, function(_, q) - while q:count() > 0 do - q:log_debug("processing queue") - q:process_once() - end - q:log_debug("done processing queue") - queues[key] = nil - end, queue) + if queue.concurrency == 1 then + kong.timer:named_at("queue " .. key, 0, function(_, q) + while q:count() > 0 do + q:log_debug("processing queue") + q:process_once() + end + q:log_debug("done processing queue") + queues[key] = nil + end, queue) + queues[key] = queue + end - queues[key] = queue queue:log_debug("queue created") @@ -636,7 +638,10 @@ function Queue.enqueue(queue_conf, handler, handler_conf, value) "arg #1 (queue_conf) max_bytes must be a number or nil" ) - -- TODO: assert concurrency + assert( + type(queue_conf.concurrency) == "number", + "arg #1 (queue_conf) concurrency must be a number" + ) local queue = get_or_create_queue(queue_conf, handler, handler_conf) return enqueue(queue, value) From 81cf796f2ab2af13a4efc267a56eef23429e3fed Mon Sep 17 00:00:00 2001 From: Douglas Lee Date: Mon, 12 Aug 2024 15:23:49 +0800 Subject: [PATCH 21/25] update --- .../unreleased/kong/feat-queue-concurrency-limit.yml | 5 +++++ .../unreleased/kong/http-log-concurrent-optimize.yml | 3 --- kong/clustering/compat/removed_fields.lua | 10 +++++----- kong/tools/queue.lua | 10 +++++----- kong/tools/queue_schema.lua | 7 +++---- spec/01-unit/27-queue_spec.lua | 3 +++ .../09-hybrid_mode/09-config-compat_spec.lua | 4 ++++ 7 files changed, 25 insertions(+), 17 deletions(-) create mode 100644 changelog/unreleased/kong/feat-queue-concurrency-limit.yml delete mode 100644 changelog/unreleased/kong/http-log-concurrent-optimize.yml diff --git a/changelog/unreleased/kong/feat-queue-concurrency-limit.yml b/changelog/unreleased/kong/feat-queue-concurrency-limit.yml new file mode 100644 index 000000000000..7dbfc5e9b9ff --- /dev/null +++ b/changelog/unreleased/kong/feat-queue-concurrency-limit.yml @@ -0,0 +1,5 @@ +message: | + Added a new configuration `concurrency_limit`(integer, default to 1) for Queue to specify the number of delivery timers. + Note that setting concurrency_limit to -1 means no limit, and each HTTP Log will create a individual timer to send. +type: feature +scope: Core diff --git a/changelog/unreleased/kong/http-log-concurrent-optimize.yml b/changelog/unreleased/kong/http-log-concurrent-optimize.yml deleted file mode 100644 index 71e5de84e64f..000000000000 --- a/changelog/unreleased/kong/http-log-concurrent-optimize.yml +++ /dev/null @@ -1,3 +0,0 @@ -message: "**HTTP-Log**: Added a new configuration `queue.concurrency`" -type: feature -scope: Plugin diff --git a/kong/clustering/compat/removed_fields.lua b/kong/clustering/compat/removed_fields.lua index 94bdc42de5e9..94a33aac1526 100644 --- a/kong/clustering/compat/removed_fields.lua +++ b/kong/clustering/compat/removed_fields.lua @@ -166,7 +166,7 @@ return { opentelemetry = { "traces_endpoint", "logs_endpoint", - "queue.concurrency", + "queue.concurrency_limit", }, ai_proxy = { "max_request_body_size", @@ -212,16 +212,16 @@ return { "always_use_authenticated_groups", }, http_log = { - "queue.concurrency", + "queue.concurrency_limit", }, statsd = { - "queue.concurrency", + "queue.concurrency_limit", }, datadog = { - "queue.concurrency", + "queue.concurrency_limit", }, zipkin = { - "queue.concurrency", + "queue.concurrency_limit", }, }, } diff --git a/kong/tools/queue.lua b/kong/tools/queue.lua index 5a17e056649c..e96244a522e8 100644 --- a/kong/tools/queue.lua +++ b/kong/tools/queue.lua @@ -241,7 +241,7 @@ local function get_or_create_queue(queue_conf, handler, handler_conf) queue = setmetatable(queue, Queue_mt) - if queue.concurrency == 1 then + if queue.concurrency_limit == 1 then kong.timer:named_at("queue " .. key, 0, function(_, q) while q:count() > 0 do q:log_debug("processing queue") @@ -514,8 +514,8 @@ local function enqueue(self, entry) return nil, "entry must be a non-nil Lua value" end - - if self.concurrency == 0 then + if self.concurrency_limit == -1 then -- unlimited concurrency + -- do not enqueue when concurrency_limit is unlimited local ok, err = timer_at(0, function(premature) if premature then return @@ -639,8 +639,8 @@ function Queue.enqueue(queue_conf, handler, handler_conf, value) ) assert( - type(queue_conf.concurrency) == "number", - "arg #1 (queue_conf) concurrency must be a number" + type(queue_conf.concurrency_limit) == "number", + "arg #1 (queue_conf) concurrency_limit must be a number" ) local queue = get_or_create_queue(queue_conf, handler, handler_conf) diff --git a/kong/tools/queue_schema.lua b/kong/tools/queue_schema.lua index 01720297f07a..51d73981bd8b 100644 --- a/kong/tools/queue_schema.lua +++ b/kong/tools/queue_schema.lua @@ -49,12 +49,11 @@ return Schema.define { between = { 0.001, 1000000 }, -- effectively unlimited maximum description = "Maximum time in seconds between retries, caps exponential backoff." } }, - { concurrency = { + { concurrency_limit = { type = "integer", default = 1, - between = { 0, 1 }, - required = true, - description = "TBD11111111" + one_of = { -1, 1 }, + description = "The number of of queue delivery timers. -1 indicates unlimited." } }, } diff --git a/spec/01-unit/27-queue_spec.lua b/spec/01-unit/27-queue_spec.lua index 5f94c9d73094..5d9eeeea7e7f 100644 --- a/spec/01-unit/27-queue_spec.lua +++ b/spec/01-unit/27-queue_spec.lua @@ -799,6 +799,7 @@ describe("plugin queue", function() max_retry_time = 60, initial_retry_delay = 1, max_retry_delay = 60, + concurrency_limit = 1, } local function enqueue(queue_conf, entry) @@ -836,6 +837,7 @@ describe("plugin queue", function() max_retry_time = 60, initial_retry_delay = 1, max_retry_delay = 60, + concurrency_limit = 1, } -- should be true if the queue does not exist @@ -861,6 +863,7 @@ describe("plugin queue", function() max_retry_time = 60, initial_retry_delay = 1, max_retry_delay = 60, + concurrency_limit = 1, } -- should be true if the queue does not exist diff --git a/spec/02-integration/09-hybrid_mode/09-config-compat_spec.lua b/spec/02-integration/09-hybrid_mode/09-config-compat_spec.lua index 9eecc8ec7a45..15ae94852017 100644 --- a/spec/02-integration/09-hybrid_mode/09-config-compat_spec.lua +++ b/spec/02-integration/09-hybrid_mode/09-config-compat_spec.lua @@ -256,6 +256,7 @@ describe("CP/DP config compat transformations #" .. strategy, function() expected_otel_prior_35.config.traces_endpoint = nil expected_otel_prior_35.config.logs_endpoint = nil expected_otel_prior_35.config.endpoint = "http://1.1.1.1:12345/v1/trace" + expected_otel_prior_35.config.queue.concurrency_limit = nil do_assert(uuid(), "3.4.0", expected_otel_prior_35) @@ -281,6 +282,7 @@ describe("CP/DP config compat transformations #" .. strategy, function() expected_otel_prior_34.config.traces_endpoint = nil expected_otel_prior_34.config.logs_endpoint = nil expected_otel_prior_34.config.endpoint = "http://1.1.1.1:12345/v1/trace" + expected_otel_prior_34.config.queue.concurrency_limit = nil do_assert(uuid(), "3.3.0", expected_otel_prior_34) -- cleanup @@ -307,6 +309,7 @@ describe("CP/DP config compat transformations #" .. strategy, function() expected_zipkin_prior_35.config.header_type = "preserve" expected_zipkin_prior_35.config.default_header_type = "b3" expected_zipkin_prior_35.config.propagation = nil + expected_zipkin_prior_35.config.queue.concurrency_limit = nil do_assert(uuid(), "3.4.0", expected_zipkin_prior_35) -- cleanup @@ -328,6 +331,7 @@ describe("CP/DP config compat transformations #" .. strategy, function() expected_zipkin_prior_34.config.header_type = "preserve" expected_zipkin_prior_34.config.default_header_type = "b3" expected_zipkin_prior_34.config.propagation = nil + expected_zipkin_prior_34.config.queue.concurrency_limit = nil do_assert(uuid(), "3.3.0", expected_zipkin_prior_34) -- cleanup From d57f153a008f2c225059de213d021004221602ed Mon Sep 17 00:00:00 2001 From: Yusheng Li Date: Mon, 12 Aug 2024 15:25:55 +0800 Subject: [PATCH 22/25] Apply suggestions from code review --- .../11-declarative_config/02-process_auto_fields_spec.lua | 8 ++++---- .../01-schema/11-declarative_config/03-flatten_spec.lua | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/spec/01-unit/01-db/01-schema/11-declarative_config/02-process_auto_fields_spec.lua b/spec/01-unit/01-db/01-schema/11-declarative_config/02-process_auto_fields_spec.lua index 5a65cce5c9cc..44660491190d 100644 --- a/spec/01-unit/01-db/01-schema/11-declarative_config/02-process_auto_fields_spec.lua +++ b/spec/01-unit/01-db/01-schema/11-declarative_config/02-process_auto_fields_spec.lua @@ -179,7 +179,7 @@ describe("declarative config: process_auto_fields", function() max_coalescing_delay = 1, max_retry_delay = 60, max_retry_time = 60, - concurrency = 1, + concurrency_limit = 1, }, } }, @@ -237,7 +237,7 @@ describe("declarative config: process_auto_fields", function() max_coalescing_delay = 1, max_retry_delay = 60, max_retry_time = 60, - concurrency = 1, + concurrency_limit = 1, }, } }, @@ -355,7 +355,7 @@ describe("declarative config: process_auto_fields", function() max_coalescing_delay = 1, max_retry_delay = 60, max_retry_time = 60, - concurrency = 1, + concurrency_limit = 1, }, } }, @@ -677,7 +677,7 @@ describe("declarative config: process_auto_fields", function() max_coalescing_delay = 1, max_retry_delay = 60, max_retry_time = 60, - concurrency = 1, + concurrency_limit = 1, }, } } diff --git a/spec/01-unit/01-db/01-schema/11-declarative_config/03-flatten_spec.lua b/spec/01-unit/01-db/01-schema/11-declarative_config/03-flatten_spec.lua index 6647714b880e..da3beb80d698 100644 --- a/spec/01-unit/01-db/01-schema/11-declarative_config/03-flatten_spec.lua +++ b/spec/01-unit/01-db/01-schema/11-declarative_config/03-flatten_spec.lua @@ -302,7 +302,7 @@ describe("declarative config: flatten", function() max_retry_delay = 60, max_retry_time = 60, max_bytes = null, - concurrency = 1, + concurrency_limit = 1, }, } }, @@ -410,7 +410,7 @@ describe("declarative config: flatten", function() max_retry_delay = 60, max_retry_time = 60, max_bytes = null, - concurrency = 1, + concurrency_limit = 1, }, }, consumer = { @@ -613,7 +613,7 @@ describe("declarative config: flatten", function() max_retry_delay = 60, max_retry_time = 60, max_bytes = null, - concurrency = 1, + concurrency_limit = 1, }, }, consumer = null, @@ -1131,7 +1131,7 @@ describe("declarative config: flatten", function() max_retry_delay = 60, max_retry_time = 60, max_bytes = null, - concurrency = 1, + concurrency_limit = 1, }, }, consumer = null, From 1881f6fa3cb1ab09ec653e8e1447db2ee9af41ae Mon Sep 17 00:00:00 2001 From: Douglas Lee Date: Mon, 12 Aug 2024 15:48:56 +0800 Subject: [PATCH 23/25] update test case --- spec/03-plugins/03-http-log/01-log_spec.lua | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spec/03-plugins/03-http-log/01-log_spec.lua b/spec/03-plugins/03-http-log/01-log_spec.lua index 2175e31a1f48..47136e42bd0a 100644 --- a/spec/03-plugins/03-http-log/01-log_spec.lua +++ b/spec/03-plugins/03-http-log/01-log_spec.lua @@ -352,7 +352,7 @@ for _, strategy in helpers.each_strategy() do .. helpers.mock_upstream_port .. "/post_log/http", queue = { - concurrency = 0, + concurrency_limit = -1, }, } } @@ -658,7 +658,7 @@ for _, strategy in helpers.each_strategy() do admin_client:close() end) - it("should not use queue when no_queue is true", function() + it("should not use queue when queue.concurrency_limit is -1", function() reset_log("http") local res = proxy_client:get("/status/200", { headers = { From eeb640641f12e5a8b2b1819054572b13811494b7 Mon Sep 17 00:00:00 2001 From: Douglas Lee Date: Mon, 12 Aug 2024 16:02:29 +0800 Subject: [PATCH 24/25] remove ngx.timer.at --- kong/tools/queue.lua | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/kong/tools/queue.lua b/kong/tools/queue.lua index e96244a522e8..37284862b48e 100644 --- a/kong/tools/queue.lua +++ b/kong/tools/queue.lua @@ -76,7 +76,6 @@ local math_min = math.min local now = ngx.now local sleep = ngx.sleep local null = ngx.null -local timer_at = ngx.timer.at local Queue = {} @@ -516,7 +515,7 @@ local function enqueue(self, entry) if self.concurrency_limit == -1 then -- unlimited concurrency -- do not enqueue when concurrency_limit is unlimited - local ok, err = timer_at(0, function(premature) + local ok, err = kong.timer:at(0, function(premature) if premature then return end From ff1a7e17e1cfe222e45fcf58b099f3305893f3ae Mon Sep 17 00:00:00 2001 From: Yusheng Li Date: Tue, 13 Aug 2024 11:30:55 +0800 Subject: [PATCH 25/25] Update changelog/unreleased/kong/feat-queue-concurrency-limit.yml Co-authored-by: Zachary Hu <6426329+outsinre@users.noreply.github.com> --- changelog/unreleased/kong/feat-queue-concurrency-limit.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changelog/unreleased/kong/feat-queue-concurrency-limit.yml b/changelog/unreleased/kong/feat-queue-concurrency-limit.yml index 7dbfc5e9b9ff..57ffc3c621f3 100644 --- a/changelog/unreleased/kong/feat-queue-concurrency-limit.yml +++ b/changelog/unreleased/kong/feat-queue-concurrency-limit.yml @@ -1,5 +1,5 @@ message: | Added a new configuration `concurrency_limit`(integer, default to 1) for Queue to specify the number of delivery timers. - Note that setting concurrency_limit to -1 means no limit, and each HTTP Log will create a individual timer to send. + Note that setting `concurrency_limit` to `-1` means no limit at all, and each HTTP log entry would create an individual timer for sending. type: feature scope: Core