From 8db41446185a1145e4dcaaaa9871b1e592504e3e Mon Sep 17 00:00:00 2001 From: Aapo Talvensaari Date: Mon, 11 Jul 2022 20:43:57 +0300 Subject: [PATCH] fix(clustering) do not register events multiple times ### Summary This reduces double event handling and event publishing on clustering. On further PRs we can reduce double exporting, json decoding, calculate hashing etc. that are common between wrpc control plane and json control plane, but here I just fixed the double event handling. --- kong/clustering/control_plane.lua | 48 ++------------------------ kong/clustering/init.lua | 48 +++++++++++++++++++++++++- kong/clustering/wrpc_control_plane.lua | 46 ------------------------ 3 files changed, 49 insertions(+), 93 deletions(-) diff --git a/kong/clustering/control_plane.lua b/kong/clustering/control_plane.lua index acf4253fa546..4057171fa841 100644 --- a/kong/clustering/control_plane.lua +++ b/kong/clustering/control_plane.lua @@ -18,6 +18,7 @@ local ipairs = ipairs local tonumber = tonumber local ngx = ngx local ngx_log = ngx.log +local timer_at = ngx.timer.at local cjson_decode = cjson.decode local cjson_encode = cjson.encode local kong = kong @@ -571,9 +572,7 @@ end function _M:init_worker(plugins_list) -- ROLE = "control_plane" - self.plugins_list = plugins_list - self.plugins_map = plugins_list_to_map(plugins_list) self.deflated_reconfigure_payload = nil @@ -588,49 +587,6 @@ function _M:init_worker(plugins_list) local push_config_semaphore = semaphore.new() - -- Sends "clustering", "push_config" to all workers in the same node, including self - local function post_push_config_event() - local res, err = kong.worker_events.post("clustering", "push_config") - if not res then - ngx_log(ngx_ERR, _log_prefix, "unable to broadcast event: ", err) - end - end - - -- Handles "clustering:push_config" cluster event - local function handle_clustering_push_config_event(data) - ngx_log(ngx_DEBUG, _log_prefix, "received clustering:push_config event for ", data) - post_push_config_event() - end - - - -- Handles "dao:crud" worker event and broadcasts "clustering:push_config" cluster event - local function handle_dao_crud_event(data) - if type(data) ~= "table" or data.schema == nil or data.schema.db_export == false then - return - end - - kong.cluster_events:broadcast("clustering:push_config", data.schema.name .. ":" .. data.operation) - - -- we have to re-broadcast event using `post` because the dao - -- events were sent using `post_local` which means not all workers - -- can receive it - post_push_config_event() - end - - -- The "clustering:push_config" cluster event gets inserted in the cluster when there's - -- a crud change (like an insertion or deletion). Only one worker per kong node receives - -- this callback. This makes such node post push_config events to all the cp workers on - -- its node - kong.cluster_events:subscribe("clustering:push_config", handle_clustering_push_config_event) - - -- The "dao:crud" event is triggered using post_local, which eventually generates an - -- ""clustering:push_config" cluster event. It is assumed that the workers in the - -- same node where the dao:crud event originated will "know" about the update mostly via - -- changes in the cache shared dict. Since data planes don't use the cache, nodes in the same - -- kong node where the event originated will need to be notified so they push config to - -- their data planes - kong.worker_events.register(handle_dao_crud_event, "dao:crud") - -- When "clustering", "push_config" worker event is received by a worker, -- it loads and pushes the config to its the connected data planes kong.worker_events.register(function(_) @@ -642,7 +598,7 @@ function _M:init_worker(plugins_list) end end, "clustering", "push_config") - ngx.timer.at(0, push_config_loop, self, push_config_semaphore, + timer_at(0, push_config_loop, self, push_config_semaphore, self.conf.db_update_frequency) end diff --git a/kong/clustering/init.lua b/kong/clustering/init.lua index 66b453688573..a2105a66e5f4 100644 --- a/kong/clustering/init.lua +++ b/kong/clustering/init.lua @@ -9,6 +9,8 @@ local openssl_x509 = require("resty.openssl.x509") local ngx_log = ngx.log local assert = assert local sort = table.sort +local type = type + local check_protocol_support = require("kong.clustering.utils").check_protocol_support @@ -22,6 +24,37 @@ local ngx_WARN = ngx.WARN local _log_prefix = "[clustering] " +-- Sends "clustering", "push_config" to all workers in the same node, including self +local function post_push_config_event() + local res, err = kong.worker_events.post("clustering", "push_config") + if not res then + ngx_log(ngx_ERR, _log_prefix, "unable to broadcast event: ", err) + end +end + + +-- Handles "clustering:push_config" cluster event +local function handle_clustering_push_config_event(data) + ngx_log(ngx_DEBUG, _log_prefix, "received clustering:push_config event for ", data) + post_push_config_event() +end + + +-- Handles "dao:crud" worker event and broadcasts "clustering:push_config" cluster event +local function handle_dao_crud_event(data) + if type(data) ~= "table" or data.schema == nil or data.schema.db_export == false then + return + end + + kong.cluster_events:broadcast("clustering:push_config", data.schema.name .. ":" .. data.operation) + + -- we have to re-broadcast event using `post` because the dao + -- events were sent using `post_local` which means not all workers + -- can receive it + post_push_config_event() +end + + function _M.new(conf) assert(conf, "conf can not be nil", 2) @@ -61,6 +94,20 @@ function _M:handle_wrpc_websocket() end function _M:init_cp_worker(plugins_list) + -- The "clustering:push_config" cluster event gets inserted in the cluster when there's + -- a crud change (like an insertion or deletion). Only one worker per kong node receives + -- this callback. This makes such node post push_config events to all the cp workers on + -- its node + kong.cluster_events:subscribe("clustering:push_config", handle_clustering_push_config_event) + + -- The "dao:crud" event is triggered using post_local, which eventually generates an + -- ""clustering:push_config" cluster event. It is assumed that the workers in the + -- same node where the dao:crud event originated will "know" about the update mostly via + -- changes in the cache shared dict. Since data planes don't use the cache, nodes in the same + -- kong node where the event originated will need to be notified so they push config to + -- their data planes + kong.worker_events.register(handle_dao_crud_event, "dao:crud") + self.json_handler:init_worker(plugins_list) if not kong.configuration.legacy_hybrid_protocol then self.wrpc_handler:init_worker(plugins_list) @@ -115,7 +162,6 @@ function _M:init_worker() local role = self.conf.role - if kong.configuration.legacy_hybrid_protocol then ngx_log(ngx_WARN, _log_prefix, "forcing to use legacy protocol (over WebSocket)") end diff --git a/kong/clustering/wrpc_control_plane.lua b/kong/clustering/wrpc_control_plane.lua index 2479495de228..b95b1177b4e3 100644 --- a/kong/clustering/wrpc_control_plane.lua +++ b/kong/clustering/wrpc_control_plane.lua @@ -14,7 +14,6 @@ local init_negotiation_server = require("kong.clustering.services.negotiation"). local calculate_config_hash = require("kong.clustering.config_helper").calculate_config_hash local string = string local setmetatable = setmetatable -local type = type local pcall = pcall local pairs = pairs local ngx = ngx @@ -306,9 +305,7 @@ end function _M:init_worker(plugins_list) -- ROLE = "control_plane" - self.plugins_list = plugins_list - self.plugins_map = plugins_list_to_map(plugins_list) self.deflated_reconfigure_payload = nil @@ -323,49 +320,6 @@ function _M:init_worker(plugins_list) local push_config_semaphore = semaphore.new() - -- Sends "clustering", "push_config" to all workers in the same node, including self - local function post_push_config_event() - local res, err = kong.worker_events.post("clustering", "push_config") - if not res then - ngx_log(ngx_ERR, _log_prefix, "unable to broadcast event: ", err) - end - end - - -- Handles "clustering:push_config" cluster event - local function handle_clustering_push_config_event(data) - ngx_log(ngx_DEBUG, _log_prefix, "received clustering:push_config event for ", data) - post_push_config_event() - end - - - -- Handles "dao:crud" worker event and broadcasts "clustering:push_config" cluster event - local function handle_dao_crud_event(data) - if type(data) ~= "table" or data.schema == nil or data.schema.db_export == false then - return - end - - kong.cluster_events:broadcast("clustering:push_config", data.schema.name .. ":" .. data.operation) - - -- we have to re-broadcast event using `post` because the dao - -- events were sent using `post_local` which means not all workers - -- can receive it - post_push_config_event() - end - - -- The "clustering:push_config" cluster event gets inserted in the cluster when there's - -- a crud change (like an insertion or deletion). Only one worker per kong node receives - -- this callback. This makes such node post push_config events to all the cp workers on - -- its node - kong.cluster_events:subscribe("clustering:push_config", handle_clustering_push_config_event) - - -- The "dao:crud" event is triggered using post_local, which eventually generates an - -- ""clustering:push_config" cluster event. It is assumed that the workers in the - -- same node where the dao:crud event originated will "know" about the update mostly via - -- changes in the cache shared dict. Since data planes don't use the cache, nodes in the same - -- kong node where the event originated will need to be notified so they push config to - -- their data planes - kong.worker_events.register(handle_dao_crud_event, "dao:crud") - -- When "clustering", "push_config" worker event is received by a worker, -- it loads and pushes the config to its the connected data planes kong.worker_events.register(function(_)