Skip to content

Commit

Permalink
perf(clustering) conditional rebuilding of router, plugins iterator a…
Browse files Browse the repository at this point in the history
…nd balancer on dp (#8519)

### Summary

Implements conditional rebuilding of `router`, `plugins iterator` and `balancer` on
data planes. This means that DPs will not rebuild router if there were no changes in
routes or services. Similarly, the plugins iterator will not be rebuild if there were
no changes to plugins, and finally balancer in not reinitialized if there are no
changes to upstreams or targets.
  • Loading branch information
bungle authored Mar 23, 2022
1 parent 23b9393 commit 4f4af40
Show file tree
Hide file tree
Showing 6 changed files with 137 additions and 35 deletions.
14 changes: 5 additions & 9 deletions kong/clustering/control_plane.lua
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ local function get_removed_fields(dp_version_number)
unknown_fields[plugin] = {}
end
for _, k in ipairs(fields) do
table.insert(unknown_fields[plugin], k)
table_insert(unknown_fields[plugin], k)
end
end
end
Expand All @@ -201,12 +201,10 @@ _M._get_removed_fields = get_removed_fields
-- returns has_update, modified_deflated_payload, err
local function update_compatible_payload(payload, dp_version, log_suffix)
local fields = get_removed_fields(dp_version_num(dp_version))

if fields then
payload = utils.deep_copy(payload, false)
local config_table = payload["config_table"]
local has_update = invalidate_keys_from_config(config_table["plugins"], fields)

if has_update then
local deflated_payload, err = deflate_gzip(cjson_encode(payload))
if deflated_payload then
Expand Down Expand Up @@ -236,31 +234,29 @@ function _M:export_deflated_reconfigure_payload()
end
end


-- store serialized plugins map for troubleshooting purposes
local shm_key_name = "clustering:cp_plugins_configured:worker_" .. ngx.worker.id()
kong_dict:set(shm_key_name, cjson_encode(self.plugins_configured));
ngx_log(ngx_DEBUG, "plugin configuration map key: " .. shm_key_name .. " configuration: ", kong_dict:get(shm_key_name))

local config_hash = self:calculate_config_hash(config_table)
local config_hash, hashes = self:calculate_config_hash(config_table)

local payload = {
type = "reconfigure",
timestamp = ngx_now(),
config_table = config_table,
config_hash = config_hash,
hashes = hashes,
}

if not payload then
return nil, err
end
self.reconfigure_payload = payload

payload, err = deflate_gzip(cjson_encode(payload))
if not payload then
return nil, err
end

self.current_hashes = hashes
self.current_config_hash = config_hash
self.deflated_reconfigure_payload = payload

Expand Down Expand Up @@ -754,7 +750,7 @@ function _M:handle_cp_websocket()
deflated_payload = self.deflated_reconfigure_payload
elseif err then
ngx_log(ngx_WARN, "unable to update compatible payload: ", err, ", the unmodified config ",
"is returned", log_suffix)
"is returned", log_suffix)
deflated_payload = self.deflated_reconfigure_payload
end

Expand Down
25 changes: 18 additions & 7 deletions kong/clustering/data_plane.lua
Original file line number Diff line number Diff line change
Expand Up @@ -75,29 +75,37 @@ function _M:decode_config(config)
end


function _M:update_config(config_table, config_hash, update_cache)
function _M:update_config(config_table, config_hash, update_cache, hashes)
assert(type(config_table) == "table")

if not config_hash then
config_hash = self:calculate_config_hash(config_table)
config_hash, hashes = self:calculate_config_hash(config_table)
end

local current_hash = declarative.get_current_hash()
if current_hash == config_hash then
ngx_log(ngx_DEBUG, _log_prefix, "same config received from control plane, ",
"no need to reload")
return true
end

local entities, err, _, meta, new_hash =
self.declarative_config:parse_table(config_table, config_hash)
self.declarative_config:parse_table(config_table, config_hash)
if not entities then
return nil, "bad config received from control plane " .. err
end

if declarative.get_current_hash() == new_hash then
if current_hash == new_hash then
ngx_log(ngx_DEBUG, _log_prefix, "same config received from control plane, ",
"no need to reload")
return true
end

-- NOTE: no worker mutex needed as this code can only be
-- executed by worker 0

local res, err =
declarative.load_into_cache_with_events(entities, meta, new_hash)
declarative.load_into_cache_with_events(entities, meta, new_hash, hashes)
if not res then
return nil, err
end
Expand Down Expand Up @@ -290,10 +298,12 @@ function _M:communicate(premature)
local ok, err = config_semaphore:wait(1)
if ok then
local config_table = self.next_config
local config_hash = self.next_hash
if config_table then
local config_hash = self.next_hash
local hashes = self.next_hashes

local pok, res
pok, res, err = pcall(self.update_config, self, config_table, config_hash, true)
pok, res, err = pcall(self.update_config, self, config_table, config_hash, true, hashes)
if pok then
if not res then
ngx_log(ngx_ERR, _log_prefix, "unable to update running config: ", err)
Expand Down Expand Up @@ -371,6 +381,7 @@ function _M:communicate(premature)

self.next_config = assert(msg.config_table)
self.next_hash = msg.config_hash
self.next_hashes = msg.hashes

if config_semaphore:count() <= 0 then
-- the following line always executes immediately after the `if` check
Expand Down
51 changes: 48 additions & 3 deletions kong/clustering/init.lua
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
local _M = {}


local constants = require("kong.constants")
local pl_file = require("pl.file")
local pl_tablex = require("pl.tablex")
local ssl = require("ngx.ssl")
Expand All @@ -21,6 +21,9 @@ local sort = table.sort
local type = type


local DECLARATIVE_EMPTY_CONFIG_HASH = constants.DECLARATIVE_EMPTY_CONFIG_HASH


local MT = { __index = _M, }


Expand Down Expand Up @@ -104,7 +107,7 @@ local function to_sorted_string(value)
end

else
error("invalid type to be sorted (JSON types are supported")
error("invalid type to be sorted (JSON types are supported)")
end
end

Expand Down Expand Up @@ -134,7 +137,49 @@ end


function _M:calculate_config_hash(config_table)
return ngx_md5(to_sorted_string(config_table))
if type(config_table) ~= "table" then
local config_hash = ngx_md5(to_sorted_string(config_table))
return config_hash, { config = config_hash }
end

local routes = config_table.routes
local services = config_table.services
local plugins = config_table.plugins
local upstreams = config_table.upstreams
local targets = config_table.targets

local routes_hash = routes and ngx_md5(to_sorted_string(routes)) or DECLARATIVE_EMPTY_CONFIG_HASH
local services_hash = services and ngx_md5(to_sorted_string(services)) or DECLARATIVE_EMPTY_CONFIG_HASH
local plugins_hash = plugins and ngx_md5(to_sorted_string(plugins)) or DECLARATIVE_EMPTY_CONFIG_HASH
local upstreams_hash = upstreams and ngx_md5(to_sorted_string(upstreams)) or DECLARATIVE_EMPTY_CONFIG_HASH
local targets_hash = targets and ngx_md5(to_sorted_string(targets)) or DECLARATIVE_EMPTY_CONFIG_HASH

config_table.routes = nil
config_table.services = nil
config_table.plugins = nil
config_table.upstreams = nil
config_table.targets = nil

local config_hash = ngx_md5(to_sorted_string(config_table) .. routes_hash
.. services_hash
.. plugins_hash
.. upstreams_hash
.. targets_hash)

config_table.routes = routes
config_table.services = services
config_table.plugins = plugins
config_table.upstreams = upstreams
config_table.targets = targets

return config_hash, {
config = config_hash,
routes = routes_hash,
services = services_hash,
plugins = plugins_hash,
upstreams = upstreams_hash,
targets = targets_hash,
}
end


Expand Down
30 changes: 27 additions & 3 deletions kong/db/declarative/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -865,7 +865,7 @@ end
do
local DECLARATIVE_PAGE_KEY = constants.DECLARATIVE_PAGE_KEY

function declarative.load_into_cache_with_events(entities, meta, hash)
function declarative.load_into_cache_with_events(entities, meta, hash, hashes)
if exiting() then
return nil, "exiting"
end
Expand Down Expand Up @@ -926,7 +926,31 @@ do
local default_ws
ok, err, default_ws = declarative.load_into_cache(entities, meta, hash, SHADOW)
if ok then
ok, err = worker_events.post("declarative", "flip_config", default_ws)
local router_hash
local plugins_hash
local balancer_hash
if hashes then
if hashes.routes ~= DECLARATIVE_EMPTY_CONFIG_HASH then
router_hash = md5(hashes.services .. hashes.routes)
else
router_hash = DECLARATIVE_EMPTY_CONFIG_HASH
end

plugins_hash = hashes.plugins

if hashes.upstreams ~= DECLARATIVE_EMPTY_CONFIG_HASH or hashes.targets ~= DECLARATIVE_EMPTY_CONFIG_HASH then
balancer_hash = md5(hashes.upstreams .. hashes.targets)
else
balancer_hash = DECLARATIVE_EMPTY_CONFIG_HASH
end
end

ok, err = worker_events.post("declarative", "flip_config", {
default_ws,
router_hash,
plugins_hash,
balancer_hash
})
if ok ~= "done" then
kong_shm:delete(DECLARATIVE_LOCK_KEY)
return nil, "failed to flip declarative config cache pages: " .. (err or ok)
Expand Down Expand Up @@ -983,7 +1007,7 @@ do
end


-- prevent POST /config (declarative.load_into_cache_with_events eary-exits)
-- prevent POST /config (declarative.load_into_cache_with_events early-exits)
-- only "succeeds" the first time it gets called.
-- successive calls return nil, "exists"
function declarative.try_lock()
Expand Down
43 changes: 36 additions & 7 deletions kong/runloop/handler.lua
Original file line number Diff line number Diff line change
Expand Up @@ -322,8 +322,8 @@ local function register_balancer_events(core_cache, worker_events, cluster_event
return
end

singletons.core_cache:invalidate_local("balancer:upstreams")
singletons.core_cache:invalidate_local("balancer:upstreams:" .. upstream.id)
core_cache:invalidate_local("balancer:upstreams")
core_cache:invalidate_local("balancer:upstreams:" .. upstream.id)

-- => to balancer update
balancer.on_upstream_event(operation, upstream)
Expand Down Expand Up @@ -360,25 +360,54 @@ local function register_events()

-- declarative config updates

worker_events.register(function(default_ws)
local current_router_hash
local current_plugins_hash
local current_balancer_hash

worker_events.register(function(data)
if ngx.worker.exiting() then
log(NOTICE, "declarative flip config canceled: process exiting")
return true
end

local default_ws
local router_hash
local plugins_hash
local balancer_hash

if type(data) == "table" then
default_ws = data[1]
router_hash = data[2]
plugins_hash = data[3]
balancer_hash = data[4]
end

local ok, err = concurrency.with_coroutine_mutex(FLIP_CONFIG_OPTS, function()
balancer.stop_healthcheckers(CLEAR_HEALTH_STATUS_DELAY)
local rebuild_balancer = balancer_hash == nil or balancer_hash ~= current_balancer_hash
if rebuild_balancer then
balancer.stop_healthcheckers(CLEAR_HEALTH_STATUS_DELAY)
end

kong.cache:flip()
core_cache:flip()

kong.default_workspace = default_ws
ngx.ctx.workspace = kong.default_workspace

rebuild_plugins_iterator(PLUGINS_ITERATOR_SYNC_OPTS)
rebuild_router(ROUTER_SYNC_OPTS)
if plugins_hash == nil or plugins_hash ~= current_plugins_hash then
rebuild_plugins_iterator(PLUGINS_ITERATOR_SYNC_OPTS)
current_plugins_hash = plugins_hash
end

balancer.init()
if router_hash == nil or router_hash ~= current_router_hash then
rebuild_router(ROUTER_SYNC_OPTS)
current_router_hash = router_hash
end

if rebuild_balancer then
balancer.init()
current_balancer_hash = balancer_hash
end

declarative.lock()

Expand Down
9 changes: 3 additions & 6 deletions spec/01-unit/19-hybrid/02-clustering_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -167,16 +167,13 @@ describe("kong.clustering", function()
for _ = 1, 10 do
local hash = clustering.calculate_config_hash(clustering, value)
assert.is_string(hash)
assert.equal("99914b932bd37a50b983c5e7c90ae93b", hash)
assert.equal("aaf38faf0b5851d711027bb4d812d50d", hash)
end

local correct = ngx.md5("{}")
assert.equal("99914b932bd37a50b983c5e7c90ae93b", correct)

for _ = 1, 10 do
local hash = clustering.calculate_config_hash(clustering, value)
assert.is_string(hash)
assert.equal(correct, hash)
assert.equal("aaf38faf0b5851d711027bb4d812d50d", hash)
end
end)

Expand Down Expand Up @@ -207,7 +204,7 @@ describe("kong.clustering", function()
for _ = 1, 10 do
local hash = clustering.calculate_config_hash(clustering, value)
assert.is_string(hash)
assert.equal("e287bdd83a30b3c83c498e6e524f619b", hash)
assert.equal("cb83c48d5b2932d1bc9d13672b433365", hash)
assert.equal(h, hash)
end
end)
Expand Down

0 comments on commit 4f4af40

Please sign in to comment.