Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(queue): added a new configuration concurrency_limit(integer, default to 1) for Queue to specify the number of delivery timers #13332

Merged
merged 25 commits into from
Aug 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
bebb829
fix(plugins/http-log): improve concurrency when max_batch_size is set…
vm-001 Jul 4, 2024
390e3d3
add changelog
vm-001 Jul 4, 2024
8bc3c12
add test case
vm-001 Jul 4, 2024
5d10962
update
vm-001 Jul 4, 2024
445fcf7
Update changelog/unreleased/kong/http-log-Improvement.yml
vm-001 Jul 4, 2024
43d9df9
modify changelog file name
vm-001 Jul 4, 2024
41f46bf
update log message
vm-001 Jul 4, 2024
fcd4789
update test case
vm-001 Jul 4, 2024
e3305ca
why do we need to change this to 7 to make ti test passed?
vm-001 Jul 4, 2024
6e10ce4
code style: localize
vm-001 Jul 4, 2024
8baf023
fix test case
vm-001 Jul 5, 2024
9fb05bf
Update spec/03-plugins/03-http-log/01-log_spec.lua
vm-001 Jul 5, 2024
0ed7365
Update spec/03-plugins/03-http-log/01-log_spec.lua
vm-001 Jul 5, 2024
51ca784
update message
vm-001 Jul 5, 2024
368b0be
fix(plugins/http-log): improve concurrency when max_batch_size is set…
vm-001 Jul 17, 2024
4683d93
feature(plugins/http-log): added a new configuration `no_queue` to in…
vm-001 Aug 7, 2024
0914b62
feature(plugins/http-log): added a new configuration `no_queue` to in…
vm-001 Aug 7, 2024
86482ba
update test cases
vm-001 Aug 7, 2024
6b92162
feat(queue): add concurrency
vm-001 Aug 9, 2024
744c5e6
fix: only cache queue when concurrency is 1
vm-001 Aug 9, 2024
81cf796
update
vm-001 Aug 12, 2024
d57f153
Apply suggestions from code review
vm-001 Aug 12, 2024
1881f6f
update test case
vm-001 Aug 12, 2024
eeb6406
remove ngx.timer.at
vm-001 Aug 12, 2024
ff1a7e1
Update changelog/unreleased/kong/feat-queue-concurrency-limit.yml
vm-001 Aug 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions changelog/unreleased/kong/feat-queue-concurrency-limit.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
message: |
Added a new configuration `concurrency_limit`(integer, default to 1) for Queue to specify the number of delivery timers.
Note that setting `concurrency_limit` to `-1` means no limit at all, and each HTTP log entry would create an individual timer for sending.
type: feature
scope: Core
13 changes: 13 additions & 0 deletions kong/clustering/compat/removed_fields.lua
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ return {
opentelemetry = {
"traces_endpoint",
"logs_endpoint",
"queue.concurrency_limit",
},
ai_proxy = {
"max_request_body_size",
Expand Down Expand Up @@ -210,5 +211,17 @@ return {
acl = {
"always_use_authenticated_groups",
},
http_log = {
"queue.concurrency_limit",
},
statsd = {
"queue.concurrency_limit",
},
datadog = {
"queue.concurrency_limit",
},
zipkin = {
"queue.concurrency_limit",
},
},
}
115 changes: 71 additions & 44 deletions kong/tools/queue.lua
Original file line number Diff line number Diff line change
Expand Up @@ -240,16 +240,18 @@ local function get_or_create_queue(queue_conf, handler, handler_conf)

queue = setmetatable(queue, Queue_mt)

kong.timer:named_at("queue " .. key, 0, function(_, q)
while q:count() > 0 do
q:log_debug("processing queue")
q:process_once()
end
q:log_debug("done processing queue")
queues[key] = nil
end, queue)
if queue.concurrency_limit == 1 then
kong.timer:named_at("queue " .. key, 0, function(_, q)
while q:count() > 0 do
q:log_debug("processing queue")
vm-001 marked this conversation as resolved.
Show resolved Hide resolved
q:process_once()
end
q:log_debug("done processing queue")
vm-001 marked this conversation as resolved.
Show resolved Hide resolved
queues[key] = nil
end, queue)
queues[key] = queue
end

queues[key] = queue

queue:log_debug("queue created")

Expand Down Expand Up @@ -314,6 +316,45 @@ function Queue.can_enqueue(queue_conf, entry)
return _can_enqueue(queue, entry)
end

local function handle(self, entries)
local entry_count = #entries

local start_time = now()
local retry_count = 0
while true do
self:log_debug("passing %d entries to handler", entry_count)
local status, ok, err = pcall(self.handler, self.handler_conf, entries)
outsinre marked this conversation as resolved.
Show resolved Hide resolved
if status and ok == true then
self:log_debug("handler processed %d entries successfully", entry_count)
break
end

if not status then
-- protected call failed, ok is the error message
err = ok
end

self:log_warn("handler could not process entries: %s", tostring(err or "no error details returned by handler"))

if not err then
self:log_err("handler returned falsy value but no error information")
end

if (now() - start_time) > self.max_retry_time then
self:log_err(
"could not send entries due to max_retry_time exceeded. %d queue entries were lost",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"could not send entries due to max_retry_time exceeded. %d queue entries were lost",
"failed to send %d entries due to maximum timeout reached.",

entry_count)
break
end

-- Delay before retrying. The delay time is calculated by multiplying the configured initial_retry_delay with
-- 2 to the power of the number of retries, creating an exponential increase over the course of each retry.
-- The maximum time between retries is capped by the max_retry_delay configuration parameter.
sleep(math_min(self.max_retry_delay, 2 ^ retry_count * self.initial_retry_delay))
retry_count = retry_count + 1
end
end


-- Delete the frontmost entry from the queue and adjust the current utilization variables.
function Queue:delete_frontmost_entry()
Expand Down Expand Up @@ -387,41 +428,7 @@ function Queue:process_once()
self.already_dropped_entries = false
end

local start_time = now()
local retry_count = 0
while true do
self:log_debug("passing %d entries to handler", entry_count)
local status
status, ok, err = pcall(self.handler, self.handler_conf, batch)
if status and ok == true then
self:log_debug("handler processed %d entries successfully", entry_count)
break
end

if not status then
-- protected call failed, ok is the error message
err = ok
end

self:log_warn("handler could not process entries: %s", tostring(err or "no error details returned by handler"))

if not err then
self:log_err("handler returned falsy value but no error information")
end

if (now() - start_time) > self.max_retry_time then
self:log_err(
"could not send entries, giving up after %d retries. %d queue entries were lost",
retry_count, entry_count)
break
end

-- Delay before retrying. The delay time is calculated by multiplying the configured initial_retry_delay with
-- 2 to the power of the number of retries, creating an exponential increase over the course of each retry.
-- The maximum time between retries is capped by the max_retry_delay configuration parameter.
sleep(math_min(self.max_retry_delay, 2 ^ retry_count * self.initial_retry_delay))
retry_count = retry_count + 1
end
handle(self, batch)
end


Expand Down Expand Up @@ -506,6 +513,21 @@ local function enqueue(self, entry)
return nil, "entry must be a non-nil Lua value"
end

if self.concurrency_limit == -1 then -- unlimited concurrency
outsinre marked this conversation as resolved.
Show resolved Hide resolved
-- do not enqueue when concurrency_limit is unlimited
local ok, err = kong.timer:at(0, function(premature)
if premature then
return
end
handle(self, { entry })
end)
if not ok then
return nil, "failed to crete timer: " .. err
end
return true
end


if self:count() >= self.max_entries * CAPACITY_WARNING_THRESHOLD then
if not self.warned then
self:log_warn('queue at %s%% capacity', CAPACITY_WARNING_THRESHOLD * 100)
Expand Down Expand Up @@ -615,6 +637,11 @@ function Queue.enqueue(queue_conf, handler, handler_conf, value)
"arg #1 (queue_conf) max_bytes must be a number or nil"
)

assert(
type(queue_conf.concurrency_limit) == "number",
"arg #1 (queue_conf) concurrency_limit must be a number"
)

local queue = get_or_create_queue(queue_conf, handler, handler_conf)
return enqueue(queue, value)
end
Expand Down
7 changes: 7 additions & 0 deletions kong/tools/queue_schema.lua
Original file line number Diff line number Diff line change
Expand Up @@ -49,5 +49,12 @@ return Schema.define {
between = { 0.001, 1000000 }, -- effectively unlimited maximum
description = "Maximum time in seconds between retries, caps exponential backoff."
} },
{ concurrency_limit = {
type = "integer",
default = 1,
one_of = { -1, 1 },
vm-001 marked this conversation as resolved.
Show resolved Hide resolved
description = "The number of of queue delivery timers. -1 indicates unlimited."
} },

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ describe("declarative config: process_auto_fields", function()
max_coalescing_delay = 1,
max_retry_delay = 60,
max_retry_time = 60,
concurrency_limit = 1,
},
}
},
Expand Down Expand Up @@ -236,6 +237,7 @@ describe("declarative config: process_auto_fields", function()
max_coalescing_delay = 1,
max_retry_delay = 60,
max_retry_time = 60,
concurrency_limit = 1,
},
}
},
Expand Down Expand Up @@ -353,6 +355,7 @@ describe("declarative config: process_auto_fields", function()
max_coalescing_delay = 1,
max_retry_delay = 60,
max_retry_time = 60,
concurrency_limit = 1,
},
}
},
Expand Down Expand Up @@ -674,6 +677,7 @@ describe("declarative config: process_auto_fields", function()
max_coalescing_delay = 1,
max_retry_delay = 60,
max_retry_time = 60,
concurrency_limit = 1,
},
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ describe("declarative config: flatten", function()
max_retry_delay = 60,
max_retry_time = 60,
max_bytes = null,
concurrency_limit = 1,
},
}
},
Expand Down Expand Up @@ -409,6 +410,7 @@ describe("declarative config: flatten", function()
max_retry_delay = 60,
max_retry_time = 60,
max_bytes = null,
concurrency_limit = 1,
},
},
consumer = {
Expand Down Expand Up @@ -611,7 +613,8 @@ describe("declarative config: flatten", function()
max_retry_delay = 60,
max_retry_time = 60,
max_bytes = null,
}
concurrency_limit = 1,
},
},
consumer = null,
created_at = 1234567890,
Expand Down Expand Up @@ -1128,6 +1131,7 @@ describe("declarative config: flatten", function()
max_retry_delay = 60,
max_retry_time = 60,
max_bytes = null,
concurrency_limit = 1,
},
},
consumer = null,
Expand Down
5 changes: 4 additions & 1 deletion spec/01-unit/27-queue_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -786,7 +786,7 @@ describe("plugin queue", function()
assert.equal("One", processed[1])
assert.equal("Three", processed[2])
assert.match_re(log_messages, 'WARN \\[\\] queue continue-processing: handler could not process entries: .*: hard error')
assert.match_re(log_messages, 'ERR \\[\\] queue continue-processing: could not send entries, giving up after \\d retries. 1 queue entries were lost')
assert.match_re(log_messages, 'ERR \\[\\] queue continue-processing: could not send entries due to max_retry_time exceeded. \\d queue entries were lost')
end)

it("sanity check for function Queue.is_full() & Queue.can_enqueue()", function()
Expand All @@ -799,6 +799,7 @@ describe("plugin queue", function()
max_retry_time = 60,
initial_retry_delay = 1,
max_retry_delay = 60,
concurrency_limit = 1,
}

local function enqueue(queue_conf, entry)
Expand Down Expand Up @@ -836,6 +837,7 @@ describe("plugin queue", function()
max_retry_time = 60,
initial_retry_delay = 1,
max_retry_delay = 60,
concurrency_limit = 1,
}

-- should be true if the queue does not exist
Expand All @@ -861,6 +863,7 @@ describe("plugin queue", function()
max_retry_time = 60,
initial_retry_delay = 1,
max_retry_delay = 60,
concurrency_limit = 1,
}

-- should be true if the queue does not exist
Expand Down
4 changes: 4 additions & 0 deletions spec/02-integration/09-hybrid_mode/09-config-compat_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ describe("CP/DP config compat transformations #" .. strategy, function()
expected_otel_prior_35.config.traces_endpoint = nil
expected_otel_prior_35.config.logs_endpoint = nil
expected_otel_prior_35.config.endpoint = "http://1.1.1.1:12345/v1/trace"
expected_otel_prior_35.config.queue.concurrency_limit = nil

do_assert(uuid(), "3.4.0", expected_otel_prior_35)

Expand All @@ -281,6 +282,7 @@ describe("CP/DP config compat transformations #" .. strategy, function()
expected_otel_prior_34.config.traces_endpoint = nil
expected_otel_prior_34.config.logs_endpoint = nil
expected_otel_prior_34.config.endpoint = "http://1.1.1.1:12345/v1/trace"
expected_otel_prior_34.config.queue.concurrency_limit = nil
do_assert(uuid(), "3.3.0", expected_otel_prior_34)

-- cleanup
Expand All @@ -307,6 +309,7 @@ describe("CP/DP config compat transformations #" .. strategy, function()
expected_zipkin_prior_35.config.header_type = "preserve"
expected_zipkin_prior_35.config.default_header_type = "b3"
expected_zipkin_prior_35.config.propagation = nil
expected_zipkin_prior_35.config.queue.concurrency_limit = nil
do_assert(uuid(), "3.4.0", expected_zipkin_prior_35)

-- cleanup
Expand All @@ -328,6 +331,7 @@ describe("CP/DP config compat transformations #" .. strategy, function()
expected_zipkin_prior_34.config.header_type = "preserve"
expected_zipkin_prior_34.config.default_header_type = "b3"
expected_zipkin_prior_34.config.propagation = nil
expected_zipkin_prior_34.config.queue.concurrency_limit = nil
do_assert(uuid(), "3.3.0", expected_zipkin_prior_34)

-- cleanup
Expand Down
33 changes: 33 additions & 0 deletions spec/03-plugins/03-http-log/01-log_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,25 @@ for _, strategy in helpers.each_strategy() do
}
}

local route1_4 = bp.routes:insert {
hosts = { "no_queue.test" },
service = service1
}

bp.plugins:insert {
route = { id = route1_4.id },
name = "http-log",
config = {
http_endpoint = "http://" .. helpers.mock_upstream_host
.. ":"
.. helpers.mock_upstream_port
.. "/post_log/http",
queue = {
concurrency_limit = -1,
},
}
}

helpers.setenv(vault_env_name, vault_env_value)

assert(helpers.start_kong({
Expand Down Expand Up @@ -638,6 +657,20 @@ for _, strategy in helpers.each_strategy() do

admin_client:close()
end)

it("should not use queue when queue.concurrency_limit is -1", function()
reset_log("http")
local res = proxy_client:get("/status/200", {
headers = {
["Host"] = "no_queue.test"
}
})
assert.res_status(200, res)

local entries = get_log("http", 1)
assert.same("127.0.0.1", entries[1].client_ip)
assert.logfile().has.no.line("processing queue", true) -- should not use queue
end)
end)

-- test both with a single worker for a deterministic test,
Expand Down
Loading