Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(clustering) do not register events multiple times #9082

Merged
merged 1 commit into from
Jul 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 2 additions & 46 deletions kong/clustering/control_plane.lua
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ local ipairs = ipairs
local tonumber = tonumber
local ngx = ngx
local ngx_log = ngx.log
local timer_at = ngx.timer.at
local cjson_decode = cjson.decode
local cjson_encode = cjson.encode
local kong = kong
Expand Down Expand Up @@ -571,9 +572,7 @@ end

function _M:init_worker(plugins_list)
-- ROLE = "control_plane"

self.plugins_list = plugins_list

self.plugins_map = plugins_list_to_map(plugins_list)

self.deflated_reconfigure_payload = nil
Expand All @@ -588,49 +587,6 @@ function _M:init_worker(plugins_list)

local push_config_semaphore = semaphore.new()

-- Sends "clustering", "push_config" to all workers in the same node, including self
local function post_push_config_event()
local res, err = kong.worker_events.post("clustering", "push_config")
if not res then
ngx_log(ngx_ERR, _log_prefix, "unable to broadcast event: ", err)
end
end

-- Handles "clustering:push_config" cluster event
local function handle_clustering_push_config_event(data)
ngx_log(ngx_DEBUG, _log_prefix, "received clustering:push_config event for ", data)
post_push_config_event()
end


-- Handles "dao:crud" worker event and broadcasts "clustering:push_config" cluster event
local function handle_dao_crud_event(data)
if type(data) ~= "table" or data.schema == nil or data.schema.db_export == false then
return
end

kong.cluster_events:broadcast("clustering:push_config", data.schema.name .. ":" .. data.operation)

-- we have to re-broadcast event using `post` because the dao
-- events were sent using `post_local` which means not all workers
-- can receive it
post_push_config_event()
end

-- The "clustering:push_config" cluster event gets inserted in the cluster when there's
-- a crud change (like an insertion or deletion). Only one worker per kong node receives
-- this callback. This makes such node post push_config events to all the cp workers on
-- its node
kong.cluster_events:subscribe("clustering:push_config", handle_clustering_push_config_event)

-- The "dao:crud" event is triggered using post_local, which eventually generates an
-- ""clustering:push_config" cluster event. It is assumed that the workers in the
-- same node where the dao:crud event originated will "know" about the update mostly via
-- changes in the cache shared dict. Since data planes don't use the cache, nodes in the same
-- kong node where the event originated will need to be notified so they push config to
-- their data planes
kong.worker_events.register(handle_dao_crud_event, "dao:crud")

-- When "clustering", "push_config" worker event is received by a worker,
-- it loads and pushes the config to its the connected data planes
kong.worker_events.register(function(_)
Expand All @@ -642,7 +598,7 @@ function _M:init_worker(plugins_list)
end
end, "clustering", "push_config")

ngx.timer.at(0, push_config_loop, self, push_config_semaphore,
timer_at(0, push_config_loop, self, push_config_semaphore,
self.conf.db_update_frequency)
end

Expand Down
48 changes: 47 additions & 1 deletion kong/clustering/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ local openssl_x509 = require("resty.openssl.x509")
local ngx_log = ngx.log
local assert = assert
local sort = table.sort
local type = type


local check_protocol_support =
require("kong.clustering.utils").check_protocol_support
Expand All @@ -22,6 +24,37 @@ local ngx_WARN = ngx.WARN
local _log_prefix = "[clustering] "


-- Sends "clustering", "push_config" to all workers in the same node, including self
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we put those config sync related codes into a separate file?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will be done on following refactoring PR.

local function post_push_config_event()
local res, err = kong.worker_events.post("clustering", "push_config")
if not res then
ngx_log(ngx_ERR, _log_prefix, "unable to broadcast event: ", err)
end
end


-- Handles "clustering:push_config" cluster event
local function handle_clustering_push_config_event(data)
ngx_log(ngx_DEBUG, _log_prefix, "received clustering:push_config event for ", data)
post_push_config_event()
end


-- Handles "dao:crud" worker event and broadcasts "clustering:push_config" cluster event
local function handle_dao_crud_event(data)
if type(data) ~= "table" or data.schema == nil or data.schema.db_export == false then
return
end

kong.cluster_events:broadcast("clustering:push_config", data.schema.name .. ":" .. data.operation)

-- we have to re-broadcast event using `post` because the dao
-- events were sent using `post_local` which means not all workers
-- can receive it
post_push_config_event()
end


function _M.new(conf)
assert(conf, "conf can not be nil", 2)

Expand Down Expand Up @@ -61,6 +94,20 @@ function _M:handle_wrpc_websocket()
end

function _M:init_cp_worker(plugins_list)
-- The "clustering:push_config" cluster event gets inserted in the cluster when there's
-- a crud change (like an insertion or deletion). Only one worker per kong node receives
-- this callback. This makes such node post push_config events to all the cp workers on
-- its node
kong.cluster_events:subscribe("clustering:push_config", handle_clustering_push_config_event)

-- The "dao:crud" event is triggered using post_local, which eventually generates an
-- ""clustering:push_config" cluster event. It is assumed that the workers in the
-- same node where the dao:crud event originated will "know" about the update mostly via
-- changes in the cache shared dict. Since data planes don't use the cache, nodes in the same
-- kong node where the event originated will need to be notified so they push config to
-- their data planes
kong.worker_events.register(handle_dao_crud_event, "dao:crud")

self.json_handler:init_worker(plugins_list)
if not kong.configuration.legacy_hybrid_protocol then
self.wrpc_handler:init_worker(plugins_list)
Expand Down Expand Up @@ -115,7 +162,6 @@ function _M:init_worker()

local role = self.conf.role


if kong.configuration.legacy_hybrid_protocol then
ngx_log(ngx_WARN, _log_prefix, "forcing to use legacy protocol (over WebSocket)")
end
Expand Down
46 changes: 0 additions & 46 deletions kong/clustering/wrpc_control_plane.lua
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ local init_negotiation_server = require("kong.clustering.services.negotiation").
local calculate_config_hash = require("kong.clustering.config_helper").calculate_config_hash
local string = string
local setmetatable = setmetatable
local type = type
local pcall = pcall
local pairs = pairs
local ngx = ngx
Expand Down Expand Up @@ -306,9 +305,7 @@ end

function _M:init_worker(plugins_list)
-- ROLE = "control_plane"

self.plugins_list = plugins_list

self.plugins_map = plugins_list_to_map(plugins_list)

self.deflated_reconfigure_payload = nil
Expand All @@ -323,49 +320,6 @@ function _M:init_worker(plugins_list)

local push_config_semaphore = semaphore.new()

-- Sends "clustering", "push_config" to all workers in the same node, including self
local function post_push_config_event()
local res, err = kong.worker_events.post("clustering", "push_config")
if not res then
ngx_log(ngx_ERR, _log_prefix, "unable to broadcast event: ", err)
end
end

-- Handles "clustering:push_config" cluster event
local function handle_clustering_push_config_event(data)
ngx_log(ngx_DEBUG, _log_prefix, "received clustering:push_config event for ", data)
post_push_config_event()
end


-- Handles "dao:crud" worker event and broadcasts "clustering:push_config" cluster event
local function handle_dao_crud_event(data)
if type(data) ~= "table" or data.schema == nil or data.schema.db_export == false then
return
end

kong.cluster_events:broadcast("clustering:push_config", data.schema.name .. ":" .. data.operation)

-- we have to re-broadcast event using `post` because the dao
-- events were sent using `post_local` which means not all workers
-- can receive it
post_push_config_event()
end

-- The "clustering:push_config" cluster event gets inserted in the cluster when there's
-- a crud change (like an insertion or deletion). Only one worker per kong node receives
-- this callback. This makes such node post push_config events to all the cp workers on
-- its node
kong.cluster_events:subscribe("clustering:push_config", handle_clustering_push_config_event)

-- The "dao:crud" event is triggered using post_local, which eventually generates an
-- ""clustering:push_config" cluster event. It is assumed that the workers in the
-- same node where the dao:crud event originated will "know" about the update mostly via
-- changes in the cache shared dict. Since data planes don't use the cache, nodes in the same
-- kong node where the event originated will need to be notified so they push config to
-- their data planes
kong.worker_events.register(handle_dao_crud_event, "dao:crud")

-- When "clustering", "push_config" worker event is received by a worker,
-- it loads and pushes the config to its the connected data planes
kong.worker_events.register(function(_)
Expand Down