From 4f4af4045d23b1251aa91990293a9f77a37ec11c Mon Sep 17 00:00:00 2001 From: Aapo Talvensaari Date: Wed, 23 Mar 2022 14:22:07 +0200 Subject: [PATCH] perf(clustering) conditional rebuilding of router, plugins iterator and balancer on dp (#8519) ### Summary Implements conditional rebuilding of `router`, `plugins iterator` and `balancer` on data planes. This means that DPs will not rebuild router if there were no changes in routes or services. Similarly, the plugins iterator will not be rebuild if there were no changes to plugins, and finally balancer in not reinitialized if there are no changes to upstreams or targets. --- kong/clustering/control_plane.lua | 14 ++--- kong/clustering/data_plane.lua | 25 ++++++--- kong/clustering/init.lua | 51 +++++++++++++++++-- kong/db/declarative/init.lua | 30 +++++++++-- kong/runloop/handler.lua | 43 +++++++++++++--- spec/01-unit/19-hybrid/02-clustering_spec.lua | 9 ++-- 6 files changed, 137 insertions(+), 35 deletions(-) diff --git a/kong/clustering/control_plane.lua b/kong/clustering/control_plane.lua index f3af11df3237..792f67eb20bf 100644 --- a/kong/clustering/control_plane.lua +++ b/kong/clustering/control_plane.lua @@ -187,7 +187,7 @@ local function get_removed_fields(dp_version_number) unknown_fields[plugin] = {} end for _, k in ipairs(fields) do - table.insert(unknown_fields[plugin], k) + table_insert(unknown_fields[plugin], k) end end end @@ -201,12 +201,10 @@ _M._get_removed_fields = get_removed_fields -- returns has_update, modified_deflated_payload, err local function update_compatible_payload(payload, dp_version, log_suffix) local fields = get_removed_fields(dp_version_num(dp_version)) - if fields then payload = utils.deep_copy(payload, false) local config_table = payload["config_table"] local has_update = invalidate_keys_from_config(config_table["plugins"], fields) - if has_update then local deflated_payload, err = deflate_gzip(cjson_encode(payload)) if deflated_payload then @@ -236,24 +234,21 @@ function _M:export_deflated_reconfigure_payload() end end - -- store serialized plugins map for troubleshooting purposes local shm_key_name = "clustering:cp_plugins_configured:worker_" .. ngx.worker.id() kong_dict:set(shm_key_name, cjson_encode(self.plugins_configured)); ngx_log(ngx_DEBUG, "plugin configuration map key: " .. shm_key_name .. " configuration: ", kong_dict:get(shm_key_name)) - local config_hash = self:calculate_config_hash(config_table) + local config_hash, hashes = self:calculate_config_hash(config_table) local payload = { type = "reconfigure", timestamp = ngx_now(), config_table = config_table, config_hash = config_hash, + hashes = hashes, } - if not payload then - return nil, err - end self.reconfigure_payload = payload payload, err = deflate_gzip(cjson_encode(payload)) @@ -261,6 +256,7 @@ function _M:export_deflated_reconfigure_payload() return nil, err end + self.current_hashes = hashes self.current_config_hash = config_hash self.deflated_reconfigure_payload = payload @@ -754,7 +750,7 @@ function _M:handle_cp_websocket() deflated_payload = self.deflated_reconfigure_payload elseif err then ngx_log(ngx_WARN, "unable to update compatible payload: ", err, ", the unmodified config ", - "is returned", log_suffix) + "is returned", log_suffix) deflated_payload = self.deflated_reconfigure_payload end diff --git a/kong/clustering/data_plane.lua b/kong/clustering/data_plane.lua index 6ab1675db37d..16793f2f9216 100644 --- a/kong/clustering/data_plane.lua +++ b/kong/clustering/data_plane.lua @@ -75,20 +75,27 @@ function _M:decode_config(config) end -function _M:update_config(config_table, config_hash, update_cache) +function _M:update_config(config_table, config_hash, update_cache, hashes) assert(type(config_table) == "table") if not config_hash then - config_hash = self:calculate_config_hash(config_table) + config_hash, hashes = self:calculate_config_hash(config_table) + end + + local current_hash = declarative.get_current_hash() + if current_hash == config_hash then + ngx_log(ngx_DEBUG, _log_prefix, "same config received from control plane, ", + "no need to reload") + return true end local entities, err, _, meta, new_hash = - self.declarative_config:parse_table(config_table, config_hash) + self.declarative_config:parse_table(config_table, config_hash) if not entities then return nil, "bad config received from control plane " .. err end - if declarative.get_current_hash() == new_hash then + if current_hash == new_hash then ngx_log(ngx_DEBUG, _log_prefix, "same config received from control plane, ", "no need to reload") return true @@ -96,8 +103,9 @@ function _M:update_config(config_table, config_hash, update_cache) -- NOTE: no worker mutex needed as this code can only be -- executed by worker 0 + local res, err = - declarative.load_into_cache_with_events(entities, meta, new_hash) + declarative.load_into_cache_with_events(entities, meta, new_hash, hashes) if not res then return nil, err end @@ -290,10 +298,12 @@ function _M:communicate(premature) local ok, err = config_semaphore:wait(1) if ok then local config_table = self.next_config - local config_hash = self.next_hash if config_table then + local config_hash = self.next_hash + local hashes = self.next_hashes + local pok, res - pok, res, err = pcall(self.update_config, self, config_table, config_hash, true) + pok, res, err = pcall(self.update_config, self, config_table, config_hash, true, hashes) if pok then if not res then ngx_log(ngx_ERR, _log_prefix, "unable to update running config: ", err) @@ -371,6 +381,7 @@ function _M:communicate(premature) self.next_config = assert(msg.config_table) self.next_hash = msg.config_hash + self.next_hashes = msg.hashes if config_semaphore:count() <= 0 then -- the following line always executes immediately after the `if` check diff --git a/kong/clustering/init.lua b/kong/clustering/init.lua index 01202aafa78a..d859a6dbdda9 100644 --- a/kong/clustering/init.lua +++ b/kong/clustering/init.lua @@ -1,6 +1,6 @@ local _M = {} - +local constants = require("kong.constants") local pl_file = require("pl.file") local pl_tablex = require("pl.tablex") local ssl = require("ngx.ssl") @@ -21,6 +21,9 @@ local sort = table.sort local type = type +local DECLARATIVE_EMPTY_CONFIG_HASH = constants.DECLARATIVE_EMPTY_CONFIG_HASH + + local MT = { __index = _M, } @@ -104,7 +107,7 @@ local function to_sorted_string(value) end else - error("invalid type to be sorted (JSON types are supported") + error("invalid type to be sorted (JSON types are supported)") end end @@ -134,7 +137,49 @@ end function _M:calculate_config_hash(config_table) - return ngx_md5(to_sorted_string(config_table)) + if type(config_table) ~= "table" then + local config_hash = ngx_md5(to_sorted_string(config_table)) + return config_hash, { config = config_hash } + end + + local routes = config_table.routes + local services = config_table.services + local plugins = config_table.plugins + local upstreams = config_table.upstreams + local targets = config_table.targets + + local routes_hash = routes and ngx_md5(to_sorted_string(routes)) or DECLARATIVE_EMPTY_CONFIG_HASH + local services_hash = services and ngx_md5(to_sorted_string(services)) or DECLARATIVE_EMPTY_CONFIG_HASH + local plugins_hash = plugins and ngx_md5(to_sorted_string(plugins)) or DECLARATIVE_EMPTY_CONFIG_HASH + local upstreams_hash = upstreams and ngx_md5(to_sorted_string(upstreams)) or DECLARATIVE_EMPTY_CONFIG_HASH + local targets_hash = targets and ngx_md5(to_sorted_string(targets)) or DECLARATIVE_EMPTY_CONFIG_HASH + + config_table.routes = nil + config_table.services = nil + config_table.plugins = nil + config_table.upstreams = nil + config_table.targets = nil + + local config_hash = ngx_md5(to_sorted_string(config_table) .. routes_hash + .. services_hash + .. plugins_hash + .. upstreams_hash + .. targets_hash) + + config_table.routes = routes + config_table.services = services + config_table.plugins = plugins + config_table.upstreams = upstreams + config_table.targets = targets + + return config_hash, { + config = config_hash, + routes = routes_hash, + services = services_hash, + plugins = plugins_hash, + upstreams = upstreams_hash, + targets = targets_hash, + } end diff --git a/kong/db/declarative/init.lua b/kong/db/declarative/init.lua index 37a03da151f6..6fe730ee9f7f 100644 --- a/kong/db/declarative/init.lua +++ b/kong/db/declarative/init.lua @@ -865,7 +865,7 @@ end do local DECLARATIVE_PAGE_KEY = constants.DECLARATIVE_PAGE_KEY - function declarative.load_into_cache_with_events(entities, meta, hash) + function declarative.load_into_cache_with_events(entities, meta, hash, hashes) if exiting() then return nil, "exiting" end @@ -926,7 +926,31 @@ do local default_ws ok, err, default_ws = declarative.load_into_cache(entities, meta, hash, SHADOW) if ok then - ok, err = worker_events.post("declarative", "flip_config", default_ws) + local router_hash + local plugins_hash + local balancer_hash + if hashes then + if hashes.routes ~= DECLARATIVE_EMPTY_CONFIG_HASH then + router_hash = md5(hashes.services .. hashes.routes) + else + router_hash = DECLARATIVE_EMPTY_CONFIG_HASH + end + + plugins_hash = hashes.plugins + + if hashes.upstreams ~= DECLARATIVE_EMPTY_CONFIG_HASH or hashes.targets ~= DECLARATIVE_EMPTY_CONFIG_HASH then + balancer_hash = md5(hashes.upstreams .. hashes.targets) + else + balancer_hash = DECLARATIVE_EMPTY_CONFIG_HASH + end + end + + ok, err = worker_events.post("declarative", "flip_config", { + default_ws, + router_hash, + plugins_hash, + balancer_hash + }) if ok ~= "done" then kong_shm:delete(DECLARATIVE_LOCK_KEY) return nil, "failed to flip declarative config cache pages: " .. (err or ok) @@ -983,7 +1007,7 @@ do end --- prevent POST /config (declarative.load_into_cache_with_events eary-exits) +-- prevent POST /config (declarative.load_into_cache_with_events early-exits) -- only "succeeds" the first time it gets called. -- successive calls return nil, "exists" function declarative.try_lock() diff --git a/kong/runloop/handler.lua b/kong/runloop/handler.lua index 7e3243d133b1..ae75a51cb185 100644 --- a/kong/runloop/handler.lua +++ b/kong/runloop/handler.lua @@ -322,8 +322,8 @@ local function register_balancer_events(core_cache, worker_events, cluster_event return end - singletons.core_cache:invalidate_local("balancer:upstreams") - singletons.core_cache:invalidate_local("balancer:upstreams:" .. upstream.id) + core_cache:invalidate_local("balancer:upstreams") + core_cache:invalidate_local("balancer:upstreams:" .. upstream.id) -- => to balancer update balancer.on_upstream_event(operation, upstream) @@ -360,14 +360,33 @@ local function register_events() -- declarative config updates - worker_events.register(function(default_ws) + local current_router_hash + local current_plugins_hash + local current_balancer_hash + + worker_events.register(function(data) if ngx.worker.exiting() then log(NOTICE, "declarative flip config canceled: process exiting") return true end + local default_ws + local router_hash + local plugins_hash + local balancer_hash + + if type(data) == "table" then + default_ws = data[1] + router_hash = data[2] + plugins_hash = data[3] + balancer_hash = data[4] + end + local ok, err = concurrency.with_coroutine_mutex(FLIP_CONFIG_OPTS, function() - balancer.stop_healthcheckers(CLEAR_HEALTH_STATUS_DELAY) + local rebuild_balancer = balancer_hash == nil or balancer_hash ~= current_balancer_hash + if rebuild_balancer then + balancer.stop_healthcheckers(CLEAR_HEALTH_STATUS_DELAY) + end kong.cache:flip() core_cache:flip() @@ -375,10 +394,20 @@ local function register_events() kong.default_workspace = default_ws ngx.ctx.workspace = kong.default_workspace - rebuild_plugins_iterator(PLUGINS_ITERATOR_SYNC_OPTS) - rebuild_router(ROUTER_SYNC_OPTS) + if plugins_hash == nil or plugins_hash ~= current_plugins_hash then + rebuild_plugins_iterator(PLUGINS_ITERATOR_SYNC_OPTS) + current_plugins_hash = plugins_hash + end - balancer.init() + if router_hash == nil or router_hash ~= current_router_hash then + rebuild_router(ROUTER_SYNC_OPTS) + current_router_hash = router_hash + end + + if rebuild_balancer then + balancer.init() + current_balancer_hash = balancer_hash + end declarative.lock() diff --git a/spec/01-unit/19-hybrid/02-clustering_spec.lua b/spec/01-unit/19-hybrid/02-clustering_spec.lua index 43080a2c71cd..8cb589b5bb0a 100644 --- a/spec/01-unit/19-hybrid/02-clustering_spec.lua +++ b/spec/01-unit/19-hybrid/02-clustering_spec.lua @@ -167,16 +167,13 @@ describe("kong.clustering", function() for _ = 1, 10 do local hash = clustering.calculate_config_hash(clustering, value) assert.is_string(hash) - assert.equal("99914b932bd37a50b983c5e7c90ae93b", hash) + assert.equal("aaf38faf0b5851d711027bb4d812d50d", hash) end - local correct = ngx.md5("{}") - assert.equal("99914b932bd37a50b983c5e7c90ae93b", correct) - for _ = 1, 10 do local hash = clustering.calculate_config_hash(clustering, value) assert.is_string(hash) - assert.equal(correct, hash) + assert.equal("aaf38faf0b5851d711027bb4d812d50d", hash) end end) @@ -207,7 +204,7 @@ describe("kong.clustering", function() for _ = 1, 10 do local hash = clustering.calculate_config_hash(clustering, value) assert.is_string(hash) - assert.equal("e287bdd83a30b3c83c498e6e524f619b", hash) + assert.equal("cb83c48d5b2932d1bc9d13672b433365", hash) assert.equal(h, hash) end end)