Skip to content

Commit

Permalink
fix(clustering) do not register events multiple times (#9082)
Browse files Browse the repository at this point in the history
### Summary

This reduces double event handling and event publishing on clustering.

On further PRs we can reduce double exporting, json decoding, calculate hashing
etc. that are common between wrpc control plane and json control plane, but here
I just fixed the double event handling.
  • Loading branch information
bungle authored Jul 12, 2022
1 parent 3b50924 commit 8b6346f
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 93 deletions.
48 changes: 2 additions & 46 deletions kong/clustering/control_plane.lua
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ local ipairs = ipairs
local tonumber = tonumber
local ngx = ngx
local ngx_log = ngx.log
local timer_at = ngx.timer.at
local cjson_decode = cjson.decode
local cjson_encode = cjson.encode
local kong = kong
Expand Down Expand Up @@ -571,9 +572,7 @@ end

function _M:init_worker(plugins_list)
-- ROLE = "control_plane"

self.plugins_list = plugins_list

self.plugins_map = plugins_list_to_map(plugins_list)

self.deflated_reconfigure_payload = nil
Expand All @@ -588,49 +587,6 @@ function _M:init_worker(plugins_list)

local push_config_semaphore = semaphore.new()

-- Sends "clustering", "push_config" to all workers in the same node, including self
local function post_push_config_event()
local res, err = kong.worker_events.post("clustering", "push_config")
if not res then
ngx_log(ngx_ERR, _log_prefix, "unable to broadcast event: ", err)
end
end

-- Handles "clustering:push_config" cluster event
local function handle_clustering_push_config_event(data)
ngx_log(ngx_DEBUG, _log_prefix, "received clustering:push_config event for ", data)
post_push_config_event()
end


-- Handles "dao:crud" worker event and broadcasts "clustering:push_config" cluster event
local function handle_dao_crud_event(data)
if type(data) ~= "table" or data.schema == nil or data.schema.db_export == false then
return
end

kong.cluster_events:broadcast("clustering:push_config", data.schema.name .. ":" .. data.operation)

-- we have to re-broadcast event using `post` because the dao
-- events were sent using `post_local` which means not all workers
-- can receive it
post_push_config_event()
end

-- The "clustering:push_config" cluster event gets inserted in the cluster when there's
-- a crud change (like an insertion or deletion). Only one worker per kong node receives
-- this callback. This makes such node post push_config events to all the cp workers on
-- its node
kong.cluster_events:subscribe("clustering:push_config", handle_clustering_push_config_event)

-- The "dao:crud" event is triggered using post_local, which eventually generates an
-- ""clustering:push_config" cluster event. It is assumed that the workers in the
-- same node where the dao:crud event originated will "know" about the update mostly via
-- changes in the cache shared dict. Since data planes don't use the cache, nodes in the same
-- kong node where the event originated will need to be notified so they push config to
-- their data planes
kong.worker_events.register(handle_dao_crud_event, "dao:crud")

-- When "clustering", "push_config" worker event is received by a worker,
-- it loads and pushes the config to its the connected data planes
kong.worker_events.register(function(_)
Expand All @@ -642,7 +598,7 @@ function _M:init_worker(plugins_list)
end
end, "clustering", "push_config")

ngx.timer.at(0, push_config_loop, self, push_config_semaphore,
timer_at(0, push_config_loop, self, push_config_semaphore,
self.conf.db_update_frequency)
end

Expand Down
48 changes: 47 additions & 1 deletion kong/clustering/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ local openssl_x509 = require("resty.openssl.x509")
local ngx_log = ngx.log
local assert = assert
local sort = table.sort
local type = type


local check_protocol_support =
require("kong.clustering.utils").check_protocol_support
Expand All @@ -22,6 +24,37 @@ local ngx_WARN = ngx.WARN
local _log_prefix = "[clustering] "


-- Sends "clustering", "push_config" to all workers in the same node, including self
local function post_push_config_event()
local res, err = kong.worker_events.post("clustering", "push_config")
if not res then
ngx_log(ngx_ERR, _log_prefix, "unable to broadcast event: ", err)
end
end


-- Handles "clustering:push_config" cluster event
local function handle_clustering_push_config_event(data)
ngx_log(ngx_DEBUG, _log_prefix, "received clustering:push_config event for ", data)
post_push_config_event()
end


-- Handles "dao:crud" worker event and broadcasts "clustering:push_config" cluster event
local function handle_dao_crud_event(data)
if type(data) ~= "table" or data.schema == nil or data.schema.db_export == false then
return
end

kong.cluster_events:broadcast("clustering:push_config", data.schema.name .. ":" .. data.operation)

-- we have to re-broadcast event using `post` because the dao
-- events were sent using `post_local` which means not all workers
-- can receive it
post_push_config_event()
end


function _M.new(conf)
assert(conf, "conf can not be nil", 2)

Expand Down Expand Up @@ -61,6 +94,20 @@ function _M:handle_wrpc_websocket()
end

function _M:init_cp_worker(plugins_list)
-- The "clustering:push_config" cluster event gets inserted in the cluster when there's
-- a crud change (like an insertion or deletion). Only one worker per kong node receives
-- this callback. This makes such node post push_config events to all the cp workers on
-- its node
kong.cluster_events:subscribe("clustering:push_config", handle_clustering_push_config_event)

-- The "dao:crud" event is triggered using post_local, which eventually generates an
-- ""clustering:push_config" cluster event. It is assumed that the workers in the
-- same node where the dao:crud event originated will "know" about the update mostly via
-- changes in the cache shared dict. Since data planes don't use the cache, nodes in the same
-- kong node where the event originated will need to be notified so they push config to
-- their data planes
kong.worker_events.register(handle_dao_crud_event, "dao:crud")

self.json_handler:init_worker(plugins_list)
if not kong.configuration.legacy_hybrid_protocol then
self.wrpc_handler:init_worker(plugins_list)
Expand Down Expand Up @@ -115,7 +162,6 @@ function _M:init_worker()

local role = self.conf.role


if kong.configuration.legacy_hybrid_protocol then
ngx_log(ngx_WARN, _log_prefix, "forcing to use legacy protocol (over WebSocket)")
end
Expand Down
46 changes: 0 additions & 46 deletions kong/clustering/wrpc_control_plane.lua
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ local init_negotiation_server = require("kong.clustering.services.negotiation").
local calculate_config_hash = require("kong.clustering.config_helper").calculate_config_hash
local string = string
local setmetatable = setmetatable
local type = type
local pcall = pcall
local pairs = pairs
local ngx = ngx
Expand Down Expand Up @@ -306,9 +305,7 @@ end

function _M:init_worker(plugins_list)
-- ROLE = "control_plane"

self.plugins_list = plugins_list

self.plugins_map = plugins_list_to_map(plugins_list)

self.deflated_reconfigure_payload = nil
Expand All @@ -323,49 +320,6 @@ function _M:init_worker(plugins_list)

local push_config_semaphore = semaphore.new()

-- Sends "clustering", "push_config" to all workers in the same node, including self
local function post_push_config_event()
local res, err = kong.worker_events.post("clustering", "push_config")
if not res then
ngx_log(ngx_ERR, _log_prefix, "unable to broadcast event: ", err)
end
end

-- Handles "clustering:push_config" cluster event
local function handle_clustering_push_config_event(data)
ngx_log(ngx_DEBUG, _log_prefix, "received clustering:push_config event for ", data)
post_push_config_event()
end


-- Handles "dao:crud" worker event and broadcasts "clustering:push_config" cluster event
local function handle_dao_crud_event(data)
if type(data) ~= "table" or data.schema == nil or data.schema.db_export == false then
return
end

kong.cluster_events:broadcast("clustering:push_config", data.schema.name .. ":" .. data.operation)

-- we have to re-broadcast event using `post` because the dao
-- events were sent using `post_local` which means not all workers
-- can receive it
post_push_config_event()
end

-- The "clustering:push_config" cluster event gets inserted in the cluster when there's
-- a crud change (like an insertion or deletion). Only one worker per kong node receives
-- this callback. This makes such node post push_config events to all the cp workers on
-- its node
kong.cluster_events:subscribe("clustering:push_config", handle_clustering_push_config_event)

-- The "dao:crud" event is triggered using post_local, which eventually generates an
-- ""clustering:push_config" cluster event. It is assumed that the workers in the
-- same node where the dao:crud event originated will "know" about the update mostly via
-- changes in the cache shared dict. Since data planes don't use the cache, nodes in the same
-- kong node where the event originated will need to be notified so they push config to
-- their data planes
kong.worker_events.register(handle_dao_crud_event, "dao:crud")

-- When "clustering", "push_config" worker event is received by a worker,
-- it loads and pushes the config to its the connected data planes
kong.worker_events.register(function(_)
Expand Down

0 comments on commit 8b6346f

Please sign in to comment.