diff --git a/kong/clustering/control_plane.lua b/kong/clustering/control_plane.lua index acf4253fa546..4057171fa841 100644 --- a/kong/clustering/control_plane.lua +++ b/kong/clustering/control_plane.lua @@ -18,6 +18,7 @@ local ipairs = ipairs local tonumber = tonumber local ngx = ngx local ngx_log = ngx.log +local timer_at = ngx.timer.at local cjson_decode = cjson.decode local cjson_encode = cjson.encode local kong = kong @@ -571,9 +572,7 @@ end function _M:init_worker(plugins_list) -- ROLE = "control_plane" - self.plugins_list = plugins_list - self.plugins_map = plugins_list_to_map(plugins_list) self.deflated_reconfigure_payload = nil @@ -588,49 +587,6 @@ function _M:init_worker(plugins_list) local push_config_semaphore = semaphore.new() - -- Sends "clustering", "push_config" to all workers in the same node, including self - local function post_push_config_event() - local res, err = kong.worker_events.post("clustering", "push_config") - if not res then - ngx_log(ngx_ERR, _log_prefix, "unable to broadcast event: ", err) - end - end - - -- Handles "clustering:push_config" cluster event - local function handle_clustering_push_config_event(data) - ngx_log(ngx_DEBUG, _log_prefix, "received clustering:push_config event for ", data) - post_push_config_event() - end - - - -- Handles "dao:crud" worker event and broadcasts "clustering:push_config" cluster event - local function handle_dao_crud_event(data) - if type(data) ~= "table" or data.schema == nil or data.schema.db_export == false then - return - end - - kong.cluster_events:broadcast("clustering:push_config", data.schema.name .. ":" .. data.operation) - - -- we have to re-broadcast event using `post` because the dao - -- events were sent using `post_local` which means not all workers - -- can receive it - post_push_config_event() - end - - -- The "clustering:push_config" cluster event gets inserted in the cluster when there's - -- a crud change (like an insertion or deletion). Only one worker per kong node receives - -- this callback. This makes such node post push_config events to all the cp workers on - -- its node - kong.cluster_events:subscribe("clustering:push_config", handle_clustering_push_config_event) - - -- The "dao:crud" event is triggered using post_local, which eventually generates an - -- ""clustering:push_config" cluster event. It is assumed that the workers in the - -- same node where the dao:crud event originated will "know" about the update mostly via - -- changes in the cache shared dict. Since data planes don't use the cache, nodes in the same - -- kong node where the event originated will need to be notified so they push config to - -- their data planes - kong.worker_events.register(handle_dao_crud_event, "dao:crud") - -- When "clustering", "push_config" worker event is received by a worker, -- it loads and pushes the config to its the connected data planes kong.worker_events.register(function(_) @@ -642,7 +598,7 @@ function _M:init_worker(plugins_list) end end, "clustering", "push_config") - ngx.timer.at(0, push_config_loop, self, push_config_semaphore, + timer_at(0, push_config_loop, self, push_config_semaphore, self.conf.db_update_frequency) end diff --git a/kong/clustering/init.lua b/kong/clustering/init.lua index 66b453688573..a2105a66e5f4 100644 --- a/kong/clustering/init.lua +++ b/kong/clustering/init.lua @@ -9,6 +9,8 @@ local openssl_x509 = require("resty.openssl.x509") local ngx_log = ngx.log local assert = assert local sort = table.sort +local type = type + local check_protocol_support = require("kong.clustering.utils").check_protocol_support @@ -22,6 +24,37 @@ local ngx_WARN = ngx.WARN local _log_prefix = "[clustering] " +-- Sends "clustering", "push_config" to all workers in the same node, including self +local function post_push_config_event() + local res, err = kong.worker_events.post("clustering", "push_config") + if not res then + ngx_log(ngx_ERR, _log_prefix, "unable to broadcast event: ", err) + end +end + + +-- Handles "clustering:push_config" cluster event +local function handle_clustering_push_config_event(data) + ngx_log(ngx_DEBUG, _log_prefix, "received clustering:push_config event for ", data) + post_push_config_event() +end + + +-- Handles "dao:crud" worker event and broadcasts "clustering:push_config" cluster event +local function handle_dao_crud_event(data) + if type(data) ~= "table" or data.schema == nil or data.schema.db_export == false then + return + end + + kong.cluster_events:broadcast("clustering:push_config", data.schema.name .. ":" .. data.operation) + + -- we have to re-broadcast event using `post` because the dao + -- events were sent using `post_local` which means not all workers + -- can receive it + post_push_config_event() +end + + function _M.new(conf) assert(conf, "conf can not be nil", 2) @@ -61,6 +94,20 @@ function _M:handle_wrpc_websocket() end function _M:init_cp_worker(plugins_list) + -- The "clustering:push_config" cluster event gets inserted in the cluster when there's + -- a crud change (like an insertion or deletion). Only one worker per kong node receives + -- this callback. This makes such node post push_config events to all the cp workers on + -- its node + kong.cluster_events:subscribe("clustering:push_config", handle_clustering_push_config_event) + + -- The "dao:crud" event is triggered using post_local, which eventually generates an + -- ""clustering:push_config" cluster event. It is assumed that the workers in the + -- same node where the dao:crud event originated will "know" about the update mostly via + -- changes in the cache shared dict. Since data planes don't use the cache, nodes in the same + -- kong node where the event originated will need to be notified so they push config to + -- their data planes + kong.worker_events.register(handle_dao_crud_event, "dao:crud") + self.json_handler:init_worker(plugins_list) if not kong.configuration.legacy_hybrid_protocol then self.wrpc_handler:init_worker(plugins_list) @@ -115,7 +162,6 @@ function _M:init_worker() local role = self.conf.role - if kong.configuration.legacy_hybrid_protocol then ngx_log(ngx_WARN, _log_prefix, "forcing to use legacy protocol (over WebSocket)") end diff --git a/kong/clustering/wrpc_control_plane.lua b/kong/clustering/wrpc_control_plane.lua index 2479495de228..b95b1177b4e3 100644 --- a/kong/clustering/wrpc_control_plane.lua +++ b/kong/clustering/wrpc_control_plane.lua @@ -14,7 +14,6 @@ local init_negotiation_server = require("kong.clustering.services.negotiation"). local calculate_config_hash = require("kong.clustering.config_helper").calculate_config_hash local string = string local setmetatable = setmetatable -local type = type local pcall = pcall local pairs = pairs local ngx = ngx @@ -306,9 +305,7 @@ end function _M:init_worker(plugins_list) -- ROLE = "control_plane" - self.plugins_list = plugins_list - self.plugins_map = plugins_list_to_map(plugins_list) self.deflated_reconfigure_payload = nil @@ -323,49 +320,6 @@ function _M:init_worker(plugins_list) local push_config_semaphore = semaphore.new() - -- Sends "clustering", "push_config" to all workers in the same node, including self - local function post_push_config_event() - local res, err = kong.worker_events.post("clustering", "push_config") - if not res then - ngx_log(ngx_ERR, _log_prefix, "unable to broadcast event: ", err) - end - end - - -- Handles "clustering:push_config" cluster event - local function handle_clustering_push_config_event(data) - ngx_log(ngx_DEBUG, _log_prefix, "received clustering:push_config event for ", data) - post_push_config_event() - end - - - -- Handles "dao:crud" worker event and broadcasts "clustering:push_config" cluster event - local function handle_dao_crud_event(data) - if type(data) ~= "table" or data.schema == nil or data.schema.db_export == false then - return - end - - kong.cluster_events:broadcast("clustering:push_config", data.schema.name .. ":" .. data.operation) - - -- we have to re-broadcast event using `post` because the dao - -- events were sent using `post_local` which means not all workers - -- can receive it - post_push_config_event() - end - - -- The "clustering:push_config" cluster event gets inserted in the cluster when there's - -- a crud change (like an insertion or deletion). Only one worker per kong node receives - -- this callback. This makes such node post push_config events to all the cp workers on - -- its node - kong.cluster_events:subscribe("clustering:push_config", handle_clustering_push_config_event) - - -- The "dao:crud" event is triggered using post_local, which eventually generates an - -- ""clustering:push_config" cluster event. It is assumed that the workers in the - -- same node where the dao:crud event originated will "know" about the update mostly via - -- changes in the cache shared dict. Since data planes don't use the cache, nodes in the same - -- kong node where the event originated will need to be notified so they push config to - -- their data planes - kong.worker_events.register(handle_dao_crud_event, "dao:crud") - -- When "clustering", "push_config" worker event is received by a worker, -- it loads and pushes the config to its the connected data planes kong.worker_events.register(function(_)