Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(cache): mlcache invalidation use separate cluster event channels to avoid useless invalidation #12321

Merged
merged 7 commits into from
Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
message: |
Each Kong cache instance now utilizes its own cluster event channel. This approach isolates cache invalidation events and reducing the generation of unnecessary worker events.
type: bugfix
scope: Core
15 changes: 11 additions & 4 deletions kong/cache/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ function _M.new(opts)
error("opts.resty_lock_opts must be a table", 2)
end

if opts.invalidation_channel and type(opts.invalidation_channel) ~= "string" then
error("opts.invalidation_channel must be a string", 2)
end

local shm_name = opts.shm_name
if not shared[shm_name] then
log(ERR, "shared dictionary ", shm_name, " not found")
Expand Down Expand Up @@ -131,17 +135,20 @@ function _M.new(opts)
end

local cluster_events = opts.cluster_events
local invalidation_channel = opts.invalidation_channel
or ("invalidations_" .. shm_name)
local self = {
cluster_events = cluster_events,
mlcache = mlcache,
dict = shared[shm_name],
shm_name = shm_name,
ttl = ttl,
neg_ttl = neg_ttl,
invalidation_channel = invalidation_channel,
}

local ok, err = cluster_events:subscribe("invalidations", function(key)
log(DEBUG, "received invalidate event from cluster for key: '", key, "'")
local ok, err = cluster_events:subscribe(self.invalidation_channel, function(key)
log(DEBUG, self.shm_name .. " received invalidate event from cluster for key: '", key, "'")
self:invalidate_local(key)
end)
if not ok then
Expand Down Expand Up @@ -230,7 +237,7 @@ function _M:invalidate_local(key)
error("key must be a string", 2)
end

log(DEBUG, "invalidating (local): '", key, "'")
log(DEBUG, self.shm_name, " invalidating (local): '", key, "'")

local ok, err = self.mlcache:delete(key)
if not ok then
Expand All @@ -248,7 +255,7 @@ function _M:invalidate(key)

log(DEBUG, "broadcasting (cluster) invalidation for key: '", key, "'")

local ok, err = self.cluster_events:broadcast("invalidations", key)
local ok, err = self.cluster_events:broadcast(self.invalidation_channel, key)
if not ok then
log(ERR, "failed to broadcast cached entity invalidation: ", err)
end
Expand Down
21 changes: 11 additions & 10 deletions kong/global.lua
Original file line number Diff line number Diff line change
Expand Up @@ -249,16 +249,17 @@ function _GLOBAL.init_cache(kong_config, cluster_events, worker_events)
end

return kong_cache.new({
shm_name = "kong_db_cache",
cluster_events = cluster_events,
worker_events = worker_events,
ttl = db_cache_ttl,
neg_ttl = db_cache_neg_ttl or db_cache_ttl,
resurrect_ttl = kong_config.resurrect_ttl,
page = page,
cache_pages = cache_pages,
resty_lock_opts = LOCK_OPTS,
lru_size = get_lru_size(kong_config),
shm_name = "kong_db_cache",
cluster_events = cluster_events,
worker_events = worker_events,
ttl = db_cache_ttl,
neg_ttl = db_cache_neg_ttl or db_cache_ttl,
resurrect_ttl = kong_config.resurrect_ttl,
page = page,
cache_pages = cache_pages,
resty_lock_opts = LOCK_OPTS,
lru_size = get_lru_size(kong_config),
invalidation_channel = "invalidations",
})
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ function Invalidations:init_worker()
assert(kong.cluster_events:subscribe("invalidations", function(key)
counts[key] = (counts[key] or 0) + 1
end))

assert(kong.cluster_events:subscribe("invalidations_kong_core_db_cache", function(key)
counts[key] = (counts[key] or 0) + 1
end))
end


Expand Down
Loading