diff --git a/changelog/unreleased/kong/separate_kong_cache_invalidation_cluster_event_channel.yml b/changelog/unreleased/kong/separate_kong_cache_invalidation_cluster_event_channel.yml new file mode 100644 index 000000000000..ab0c68bc357a --- /dev/null +++ b/changelog/unreleased/kong/separate_kong_cache_invalidation_cluster_event_channel.yml @@ -0,0 +1,4 @@ +message: | + Each Kong cache instance now utilizes its own cluster event channel. This approach isolates cache invalidation events and reducing the generation of unnecessary worker events. +type: bugfix +scope: Core diff --git a/kong/cache/init.lua b/kong/cache/init.lua index dcf2d173c133..91b21c64a1ac 100644 --- a/kong/cache/init.lua +++ b/kong/cache/init.lua @@ -86,6 +86,10 @@ function _M.new(opts) error("opts.resty_lock_opts must be a table", 2) end + if opts.invalidation_channel and type(opts.invalidation_channel) ~= "string" then + error("opts.invalidation_channel must be a string", 2) + end + local shm_name = opts.shm_name if not shared[shm_name] then log(ERR, "shared dictionary ", shm_name, " not found") @@ -131,6 +135,8 @@ function _M.new(opts) end local cluster_events = opts.cluster_events + local invalidation_channel = opts.invalidation_channel + or ("invalidations_" .. shm_name) local self = { cluster_events = cluster_events, mlcache = mlcache, @@ -138,10 +144,11 @@ function _M.new(opts) shm_name = shm_name, ttl = ttl, neg_ttl = neg_ttl, + invalidation_channel = invalidation_channel, } - local ok, err = cluster_events:subscribe("invalidations", function(key) - log(DEBUG, "received invalidate event from cluster for key: '", key, "'") + local ok, err = cluster_events:subscribe(self.invalidation_channel, function(key) + log(DEBUG, self.shm_name .. " received invalidate event from cluster for key: '", key, "'") self:invalidate_local(key) end) if not ok then @@ -230,7 +237,7 @@ function _M:invalidate_local(key) error("key must be a string", 2) end - log(DEBUG, "invalidating (local): '", key, "'") + log(DEBUG, self.shm_name, " invalidating (local): '", key, "'") local ok, err = self.mlcache:delete(key) if not ok then @@ -248,7 +255,7 @@ function _M:invalidate(key) log(DEBUG, "broadcasting (cluster) invalidation for key: '", key, "'") - local ok, err = self.cluster_events:broadcast("invalidations", key) + local ok, err = self.cluster_events:broadcast(self.invalidation_channel, key) if not ok then log(ERR, "failed to broadcast cached entity invalidation: ", err) end diff --git a/kong/global.lua b/kong/global.lua index ace19ae87fbe..468f55bf821f 100644 --- a/kong/global.lua +++ b/kong/global.lua @@ -249,16 +249,17 @@ function _GLOBAL.init_cache(kong_config, cluster_events, worker_events) end return kong_cache.new({ - shm_name = "kong_db_cache", - cluster_events = cluster_events, - worker_events = worker_events, - ttl = db_cache_ttl, - neg_ttl = db_cache_neg_ttl or db_cache_ttl, - resurrect_ttl = kong_config.resurrect_ttl, - page = page, - cache_pages = cache_pages, - resty_lock_opts = LOCK_OPTS, - lru_size = get_lru_size(kong_config), + shm_name = "kong_db_cache", + cluster_events = cluster_events, + worker_events = worker_events, + ttl = db_cache_ttl, + neg_ttl = db_cache_neg_ttl or db_cache_ttl, + resurrect_ttl = kong_config.resurrect_ttl, + page = page, + cache_pages = cache_pages, + resty_lock_opts = LOCK_OPTS, + lru_size = get_lru_size(kong_config), + invalidation_channel = "invalidations", }) end diff --git a/spec/fixtures/custom_plugins/kong/plugins/invalidations/handler.lua b/spec/fixtures/custom_plugins/kong/plugins/invalidations/handler.lua index 91ccfd67e5af..059a96b61c66 100644 --- a/spec/fixtures/custom_plugins/kong/plugins/invalidations/handler.lua +++ b/spec/fixtures/custom_plugins/kong/plugins/invalidations/handler.lua @@ -15,6 +15,10 @@ function Invalidations:init_worker() assert(kong.cluster_events:subscribe("invalidations", function(key) counts[key] = (counts[key] or 0) + 1 end)) + + assert(kong.cluster_events:subscribe("invalidations_kong_core_db_cache", function(key) + counts[key] = (counts[key] or 0) + 1 + end)) end