Skip to content

Commit

Permalink
Revert "fix(dns): eliminate asynchronous timer in syncQuery() to pr…
Browse files Browse the repository at this point in the history
…event deadlock risk (#11900)"

This reverts commit 30c178f.
  • Loading branch information
AndyZhang0707 committed Jul 26, 2024
1 parent b3745af commit a6b24fe
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 134 deletions.
3 changes: 0 additions & 3 deletions changelog/unreleased/kong/fix_dns_blocking.yml

This file was deleted.

143 changes: 69 additions & 74 deletions kong/resty/dns/client.lua
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ local time = ngx.now
local log = ngx.log
local ERR = ngx.ERR
local WARN = ngx.WARN
local ALERT = ngx.ALERT
local DEBUG = ngx.DEBUG
local PREFIX = "[dns-client] "
local timer_at = ngx.timer.at
local get_phase = ngx.get_phase

local math_min = math.min
local math_max = math.max
Expand Down Expand Up @@ -637,9 +637,7 @@ _M.init = function(options)

config = options -- store it in our module level global

-- maximum time to wait for the dns resolver to hit its timeouts
-- + 1s to ensure some delay in timer execution and semaphore return are accounted for
resolve_max_wait = options.timeout / 1000 * options.retrans + 1
resolve_max_wait = options.timeout / 1000 * options.retrans -- maximum time to wait for the dns resolver to hit its timeouts

return true
end
Expand Down Expand Up @@ -723,62 +721,44 @@ local function individualQuery(qname, r_opts, try_list)
return result, nil, try_list
end

local queue = setmetatable({}, {__mode = "v"})

local function enqueue_query(key, qname, r_opts, try_list)
local item = {
key = key,
semaphore = semaphore(),
qname = qname,
r_opts = deepcopy(r_opts),
try_list = try_list,
expire_time = time() + resolve_max_wait,
}
queue[key] = item
return item
end


local function dequeue_query(item)
if queue[item.key] == item then
-- query done, but by now many others might be waiting for our result.
-- 1) stop new ones from adding to our lock/semaphore
queue[item.key] = nil
-- 2) release all waiting threads
item.semaphore:post(math_max(item.semaphore:count() * -1, 1))
item.semaphore = nil
end
end


local function queue_get_query(key, try_list)
local item = queue[key]

if not item then
return nil
end

-- bug checks: release it actively if the waiting query queue is blocked
if item.expire_time < time() then
local err = "stale query, key:" .. key
add_status_to_try_list(try_list, err)
log(ALERT, PREFIX, err)
dequeue_query(item)
return nil
end

return item
end


local queue = setmetatable({}, {__mode = "v"})
-- to be called as a timer-callback, performs a query and returns the results
-- in the `item` table.
local function executeQuery(premature, item)
if premature then return end

item.result, item.err = individualQuery(item.qname, item.r_opts, item.try_list)
local r, err = resolver:new(config)
if not r then
item.result, item.err = r, "failed to create a resolver: " .. err
else
--[[
log(DEBUG, PREFIX, "Query executing: ", item.qname, ":", item.r_opts.qtype, " ", fquery(item))
--]]
add_status_to_try_list(item.try_list, "querying")
item.result, item.err = r:query(item.qname, item.r_opts)
if item.result then
--[[
log(DEBUG, PREFIX, "Query answer: ", item.qname, ":", item.r_opts.qtype, " ", fquery(item),
" ", frecord(item.result))
--]]
parseAnswer(item.qname, item.r_opts.qtype, item.result, item.try_list)
--[[
log(DEBUG, PREFIX, "Query parsed answer: ", item.qname, ":", item.r_opts.qtype, " ", fquery(item),
" ", frecord(item.result))
else
log(DEBUG, PREFIX, "Query error: ", item.qname, ":", item.r_opts.qtype, " err=", tostring(err))
--]]
end
end

dequeue_query(item)
-- query done, but by now many others might be waiting for our result.
-- 1) stop new ones from adding to our lock/semaphore
queue[item.key] = nil
-- 2) release all waiting threads
item.semaphore:post(math_max(item.semaphore:count() * -1, 1))
item.semaphore = nil
ngx.sleep(0)
end


Expand All @@ -792,7 +772,7 @@ end
-- the `semaphore` field will be removed). Upon error it returns `nil+error`.
local function asyncQuery(qname, r_opts, try_list)
local key = qname..":"..r_opts.qtype
local item = queue_get_query(key, try_list)
local item = queue[key]
if item then
--[[
log(DEBUG, PREFIX, "Query async (exists): ", key, " ", fquery(item))
Expand All @@ -801,7 +781,14 @@ local function asyncQuery(qname, r_opts, try_list)
return item -- already in progress, return existing query
end

item = enqueue_query(key, qname, r_opts, try_list)
item = {
key = key,
semaphore = semaphore(),
qname = qname,
r_opts = deepcopy(r_opts),
try_list = try_list,
}
queue[key] = item

local ok, err = timer_at(0, executeQuery, item)
if not ok then
Expand All @@ -827,24 +814,40 @@ end
-- @return `result + nil + try_list`, or `nil + err + try_list` in case of errors
local function syncQuery(qname, r_opts, try_list)
local key = qname..":"..r_opts.qtype
local item = queue[key]

local item = queue_get_query(key, try_list)

-- If nothing is in progress, we start a new sync query
-- if nothing is in progress, we start a new async query
if not item then
item = enqueue_query(key, qname, r_opts, try_list)
local err
item, err = asyncQuery(qname, r_opts, try_list)
if not item then
return item, err, try_list
end
else
add_status_to_try_list(try_list, "in progress (sync)")
end

item.result, item.err = individualQuery(qname, item.r_opts, try_list)
local supported_semaphore_wait_phases = {
rewrite = true,
access = true,
content = true,
timer = true,
ssl_cert = true,
ssl_session_fetch = true,
}

dequeue_query(item)
local ngx_phase = get_phase()

return item.result, item.err, try_list
if not supported_semaphore_wait_phases[ngx_phase] then
-- phase not supported by `semaphore:wait`
-- return existing query (item)
--
-- this will avoid:
-- "dns lookup pool exceeded retries" (second try and subsequent retries)
-- "API disabled in the context of init_worker_by_lua" (first try)
return item, nil, try_list
end

-- If the query is already in progress, we wait for it.

add_status_to_try_list(try_list, "in progress (sync)")

-- block and wait for the async query to complete
local ok, err = item.semaphore:wait(resolve_max_wait)
if ok and item.result then
Expand All @@ -857,14 +860,6 @@ local function syncQuery(qname, r_opts, try_list)
return item.result, item.err, try_list
end

-- bug checks
if not ok and not item.err then
item.err = err -- only first expired wait() reports error
log(ALERT, PREFIX, "semaphore:wait(", resolve_max_wait, ") failed: ", err,
", count: ", item.semaphore and item.semaphore:count(),
", qname: ", qname)
end

err = err or item.err or "unknown"
add_status_to_try_list(try_list, "error: "..err)

Expand Down
22 changes: 6 additions & 16 deletions spec/01-unit/21-dns-client/02-client_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -582,10 +582,7 @@ describe("[DNS client]", function()
}
}))
query_func = function(self, original_query_func, name, options)
-- The first request uses syncQuery not waiting on the
-- aysncQuery timer, so the low-level r:query() could not sleep(5s),
-- it can only sleep(timeout).
ngx.sleep(math.min(timeout, 5))
ngx.sleep(5)
return nil
end
local start_time = ngx.now()
Expand Down Expand Up @@ -1748,12 +1745,9 @@ describe("[DNS client]", function()
end)

it("timeout while waiting", function()

local timeout = 500
local ip = "1.4.2.3"
-- basically the local function _synchronized_query
assert(client.init({
timeout = timeout,
timeout = 500,
retrans = 1,
resolvConf = {
-- resolv.conf without `search` and `domain` options
Expand All @@ -1764,7 +1758,7 @@ describe("[DNS client]", function()
-- insert a stub thats waits and returns a fixed record
local name = TEST_DOMAIN
query_func = function()
local ip = ip
local ip = "1.4.2.3"
local entry = {
{
type = client.TYPE_A,
Expand All @@ -1776,9 +1770,7 @@ describe("[DNS client]", function()
touch = 0,
expire = gettime() + 10,
}
-- wait before we return the results
-- `+ 2` s ensures that the semaphore:wait() expires
sleep(timeout/1000 + 2)
sleep(0.5) -- wait before we return the results
return entry
end

Expand Down Expand Up @@ -1808,12 +1800,10 @@ describe("[DNS client]", function()
ngx.thread.wait(coros[i]) -- this wait will resume the scheduled ones
end

-- results[1~9] are equal, as they all will wait for the first response
for i = 1, 9 do
-- all results are equal, as they all will wait for the first response
for i = 1, 10 do
assert.equal("timeout", results[i])
end
-- results[10] comes from synchronous DNS access of the first request
assert.equal(ip, results[10][1]["address"])
end)
end)

Expand Down
7 changes: 4 additions & 3 deletions t/03-dns-client/01-phases.t
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use Test::Nginx::Socket;

plan tests => repeat_each() * (blocks() * 4 + 1);
plan tests => repeat_each() * (blocks() * 5);

workers(6);

Expand Down Expand Up @@ -59,7 +59,8 @@ qq {
GET /t
--- response_body
answers: nil
err: nil
--- error_log
err: dns client error: 101 empty record received
--- no_error_log
[error]
dns lookup pool exceeded retries
API disabled in the context of init_worker_by_lua
Loading

0 comments on commit a6b24fe

Please sign in to comment.