From 7fc2f5951d388b9ec0072717d4998321e9bced9e Mon Sep 17 00:00:00 2001 From: Xumin <100666470+Suika-Kong@users.noreply.github.com> Date: Wed, 22 Jun 2022 11:29:23 +0800 Subject: [PATCH] feat(clustering) inline version negotiation (#8926) --- kong-2.8.0-0.rockspec | 2 + kong/clustering/services/negotiation.lua | 333 ++++++++++++++++++ kong/clustering/services/supported.lua | 8 + kong/clustering/utils.lua | 1 + kong/clustering/wrpc_control_plane.lua | 69 ++-- kong/clustering/wrpc_data_plane.lua | 134 ++++--- .../services/negotiation/v1/negotiation.proto | 53 +++ kong/init.lua | 5 - 8 files changed, 516 insertions(+), 89 deletions(-) create mode 100644 kong/clustering/services/negotiation.lua create mode 100644 kong/clustering/services/supported.lua create mode 100644 kong/include/kong/services/negotiation/v1/negotiation.proto diff --git a/kong-2.8.0-0.rockspec b/kong-2.8.0-0.rockspec index 1b118f26a45b..19c490f51c69 100644 --- a/kong-2.8.0-0.rockspec +++ b/kong-2.8.0-0.rockspec @@ -72,6 +72,8 @@ build = { ["kong.clustering.utils"] = "kong/clustering/utils.lua", ["kong.clustering.compat.removed_fields"] = "kong/clustering/compat/removed_fields.lua", ["kong.clustering.config_helper"] = "kong/clustering/config_helper.lua", + ["kong.clustering.services.negotiation"] = "kong/clustering/services/negotiation.lua", + ["kong.clustering.services.supported"] = "kong/clustering/services/supported.lua", ["kong.cluster_events"] = "kong/cluster_events/init.lua", ["kong.cluster_events.strategies.cassandra"] = "kong/cluster_events/strategies/cassandra.lua", diff --git a/kong/clustering/services/negotiation.lua b/kong/clustering/services/negotiation.lua new file mode 100644 index 000000000000..2677302d4ab5 --- /dev/null +++ b/kong/clustering/services/negotiation.lua @@ -0,0 +1,333 @@ +local constants = require "kong.constants" +local clustering_utils = require "kong.clustering.utils" +-- currently they are the same. But it's possible for we to drop support for old version of DP but keep support of CP +local supported_services = require "kong.clustering.services.supported" +local asked_services = require "kong.clustering.services.supported" +local table_clear = require "table.clear" + +local time = ngx.time +local var = ngx.var +local log = ngx.log +local ERR = ngx.ERR +local NOTICE = ngx.NOTICE +local _log_prefix = "[wrpc-clustering] " +local table_concat = table.concat +local lower = string.lower +local pcall = pcall + +-- an optimization. Used when a not modified empty table is needed. +local empty_table = {} + +local pairs = pairs +local ipairs = ipairs +local type = type +local error = error + +local CLUSTERING_SYNC_STATUS = constants.CLUSTERING_SYNC_STATUS +local DECLARATIVE_EMPTY_CONFIG_HASH = constants.DECLARATIVE_EMPTY_CONFIG_HASH + +-- it's so annoying that protobuf does not support map to array +local function wrap_services(services) + local wrapped, idx = {}, 0 + for name, versions in pairs(services or empty_table) do + local wrapped_versions = {} + idx = idx + 1 + wrapped[idx] = { name = name, versions = wrapped_versions, } + + for k, version in ipairs(versions) do + wrapped_versions[k] = version.version + end + end + + return wrapped +end + +local _M = {} + +local function field_validate(tbl, field, typ) + local v = tbl + for i, ind in ipairs(field) do + if type(v) ~= "table" then + error("field '" .. table_concat(field, ".", 1, i - 1) .. "' cannot be indexed with " .. ind) + end + v = v[ind] + end + + local compare_typ = typ + if typ == "array" or typ == "object" then + compare_typ = "table" + end + + if type(v) ~= compare_typ then + local field_name = table_concat(field, '.') + error("field \"" .. field_name .. "\" must be of type " .. typ) + end +end + +local function verify_request(body) + for field, typ in pairs{ + [{ + "node", + }] = "object", + [{ + "node", "type", + }] = "string", + [{ + "node", "version", + }] = "string", + [{ + "services_requested", + }] = "array", + } do + field_validate(body, field, typ) + end +end + +local function verify_node_compatibility(client_node) + if client_node.type ~= "KONG" then + error(("unknown node type %q"):format(client_node.type), CLUSTERING_SYNC_STATUS.UNKNOWN) + end + + local ok, err, result = clustering_utils.check_kong_version_compatibility(kong.version, client_node.version) + if not ok then + error(err) + end + return result +end + +local function negotiate_version(name, versions, known_versions) + local versions_set = {} + for _, version in ipairs(versions) do + versions_set[lower(version)] = true + end + + for _, v in ipairs(known_versions) do + local version = lower(v.version) + if versions_set[version] then + return v + end + end + + return { name = name, description = "No valid version" } +end + +local function negotiate_service(name, versions) + name = lower(name) + + if type(versions) ~= "table" then + error("invalid versions array for service " .. name) + end + + local supported_service = supported_services[name] + if not supported_service then + return { description = "unknown service." } + end + + return negotiate_version(name, versions, supported_service) +end + +local function log_negotiation_result(name, version) + if version.version ~= nil then + log(NOTICE, "service accepted: \"", name, "\", version: ", version.version, ", description: ", version.description) + + else + log(NOTICE, "service rejected: \"", name, "\", reason: ", version.description) + end +end + +local function negotiate_services(services_requested) + local services = {} + + for idx, service in ipairs(services_requested) do + local name = service.name + if type(service) ~= "table" or type(name) ~= "string" then + error("malformed service requested #" .. idx) + end + + local negotiated_version = negotiate_service(name, service.versions) + services[idx] = { + name = name, + negotiated_version = negotiated_version, + } + + log_negotiation_result(name, negotiated_version) + end + + return services +end + + +local function register_client(cluster_data_plane_purge_delay, id, client_node) + local ok, err = kong.db.clustering_data_planes:upsert({ id = id, }, { + last_seen = time(), + config_hash = DECLARATIVE_EMPTY_CONFIG_HASH, + hostname = client_node.hostname, + ip = var.remote_addr, + version = client_node.version, + sync_status = client_node.sync_status, + }, { ttl = cluster_data_plane_purge_delay }) + + if not ok then + log(ERR, _log_prefix, "unable to update clustering data plane status: ", err) + return error(err) + end +end + +local function split_services(services) + local accepted, accepted_n = {}, 0 + local rejected, rejected_n = {}, 0 + for _, service in ipairs(services or empty_table) do + local tbl, idx + local negotiated_version = service.negotiated_version + if negotiated_version.version then + accepted_n = accepted_n + 1 + tbl, idx = accepted, accepted_n + else + rejected_n = rejected_n + 1 + tbl, idx = rejected, rejected_n + end + + tbl[idx] = { + name = service.name, + version = negotiated_version.version, + message = negotiated_version.description, + } + end + + return accepted, rejected +end + +local function info_to_service(info) + return info.name, { + version = info.version, + description = info.message, + } +end + +local function merge_services(accepted, rejected) + local services = {} + for _, serivce in ipairs(accepted or empty_table) do + local name, version = info_to_service(serivce) + services[name] = version + end + + for _, serivce in ipairs(rejected or empty_table) do + local name, version = info_to_service(serivce) + services[name] = version + end + + return services +end + +local cp_description + +local function get_cp_description() + if not cp_description then + cp_description = {} + end + + return cp_description +end + +function _M.init_negotiation_server(service, conf) + service:import("kong.services.negotiation.v1.negotiation") + service:set_handler("NegotiationService.NegotiateServices", function(peer, nego_req) + local ok, result = pcall(function() + + local dp_id = peer.id + log(NOTICE, "negotiating services for DP: ", dp_id) + verify_request(nego_req) + + nego_req.node.sync_status = verify_node_compatibility(nego_req.node) + local services = negotiate_services(nego_req.services_requested) + register_client(conf.cluster_data_plane_purge_delay, dp_id, nego_req.node) + + local accepted, rejected = split_services(services) + + local nego_result = { + node = get_cp_description(), + services_accepted = accepted, + services_rejected = rejected, + } + + return nego_result + end) + + if not ok then + log(ERR, _log_prefix, result) + return { error_message = result } + end + + return result + end) +end + +-- TODO: use event to notify other workers +-- Currently we assume only worker 0 cares about wRPC services +local negotiated_service +local function init_negotiated_service_tab() + if not negotiated_service then + negotiated_service = {} + + else + table_clear(negotiated_service) + end +end + +local function set_negotiated_service(name, version) + negotiated_service[name] = version +end + +local negotiation_request + +local function get_negotiation_request() + if not negotiation_request then + negotiation_request = { + node = { + type = "KONG", + version = kong.version, + hostname = kong.node.get_hostname(), + }, + services_requested = wrap_services(asked_services), + } + end + + return negotiation_request +end + +function _M.negotiate(peer) + local response_data, err = peer:call_async("NegotiationService.NegotiateServices", get_negotiation_request()) + + if not response_data then + return nil, err + end + + if response_data.error_message and not response_data.node then + return nil, response_data.error_message + end + + init_negotiated_service_tab() + local serivces = merge_services(response_data.services_accepted, response_data.services_rejected) + for name, version in pairs(serivces) do + log_negotiation_result(name, version) + set_negotiated_service(name, version) + end + + return response_data, nil +end + +function _M.get_negotiated_service(name) + local result = negotiated_service[name] + if not result then + return nil, "service not supported (and not requested)" + end + return result.version, result.description +end + + +function _M.init_negotiation_client(service) + init_negotiated_service_tab() + service:import("kong.services.negotiation.v1.negotiation") +end + +return _M diff --git a/kong/clustering/services/supported.lua b/kong/clustering/services/supported.lua new file mode 100644 index 000000000000..62871097e2cf --- /dev/null +++ b/kong/clustering/services/supported.lua @@ -0,0 +1,8 @@ +-- DP and CP shares one list. +-- Always order from most to least preferred version. + +return { + config = { + { version = "v1", description = "The configuration synchronizing service. (JSON and Gzip)" }, + }, +} diff --git a/kong/clustering/utils.lua b/kong/clustering/utils.lua index d78f1f978020..76aa92bfd551 100644 --- a/kong/clustering/utils.lua +++ b/kong/clustering/utils.lua @@ -234,6 +234,7 @@ function _M.plugins_list_to_map(plugins_list) return versions end +_M.check_kong_version_compatibility = check_kong_version_compatibility function _M.check_version_compatibility(obj, dp_version, dp_plugin_map, log_suffix) local ok, err, status = check_kong_version_compatibility(KONG_VERSION, dp_version, log_suffix) diff --git a/kong/clustering/wrpc_control_plane.lua b/kong/clustering/wrpc_control_plane.lua index 279df84952ca..2479495de228 100644 --- a/kong/clustering/wrpc_control_plane.lua +++ b/kong/clustering/wrpc_control_plane.lua @@ -10,6 +10,8 @@ local clustering_utils = require("kong.clustering.utils") local wrpc = require("kong.tools.wrpc") local wrpc_proto = require("kong.tools.wrpc.proto") local utils = require("kong.tools.utils") +local init_negotiation_server = require("kong.clustering.services.negotiation").init_negotiation_server +local calculate_config_hash = require("kong.clustering.config_helper").calculate_config_hash local string = string local setmetatable = setmetatable local type = type @@ -23,8 +25,8 @@ local ngx_exit = ngx.exit local exiting = ngx.worker.exiting local ngx_time = ngx.time local ngx_var = ngx.var +local timer_at = ngx.timer.at -local calculate_config_hash = require("kong.clustering.config_helper").calculate_config_hash local plugins_list_to_map = clustering_utils.plugins_list_to_map local deflate_gzip = utils.deflate_gzip local yield = utils.yield @@ -38,7 +40,7 @@ local ngx_CLOSE = ngx.HTTP_CLOSE local CLUSTERING_SYNC_STATUS = constants.CLUSTERING_SYNC_STATUS local _log_prefix = "[wrpc-clustering] " -local wrpc_config_service +local ok_table = { ok = "done", } local function handle_export_deflated_reconfigure_payload(self) @@ -46,36 +48,40 @@ local function handle_export_deflated_reconfigure_payload(self) return ok, p_err or err end +local function init_config_service(wrpc_service, cp) + wrpc_service:import("kong.services.config.v1.config") -local function get_config_service(self) - if not wrpc_config_service then - wrpc_config_service = wrpc_proto.new() - wrpc_config_service:import("kong.services.config.v1.config") + wrpc_service:set_handler("ConfigService.PingCP", function(peer, data) + local client = cp.clients[peer.conn] + if client and client.update_sync_status then + client.last_seen = ngx_time() + client.config_hash = data.hash + client:update_sync_status() + ngx_log(ngx_INFO, _log_prefix, "received ping frame from data plane") + end + end) + + wrpc_service:set_handler("ConfigService.ReportMetadata", function(peer, data) + local client = cp.clients[peer.conn] + if client then + ngx_log(ngx_INFO, _log_prefix, "received initial metadata package from client: ", client.dp_id) + client.basic_info = data + client.basic_info_semaphore:post() + end + return ok_table + end) +end - wrpc_config_service:set_handler("ConfigService.PingCP", function(peer, data) - local client = self.clients[peer.conn] - if client and client.update_sync_status then - client.last_seen = ngx_time() - client.config_hash = data.hash - client:update_sync_status() - ngx_log(ngx_INFO, _log_prefix, "received ping frame from data plane") - end - end) - - wrpc_config_service:set_handler("ConfigService.ReportMetadata", function(peer, data) - local client = self.clients[peer.conn] - if client then - ngx_log(ngx_INFO, _log_prefix, "received initial metadata package from client: ", client.dp_id) - client.basic_info = data - client.basic_info_semaphore:post() - end - return { - ok = "done", - } - end) +local wrpc_service + +local function get_wrpc_service(self) + if not wrpc_service then + wrpc_service = wrpc_proto.new() + init_negotiation_server(wrpc_service, self.conf) + init_config_service(wrpc_service, self) end - return wrpc_config_service + return wrpc_service end @@ -115,7 +121,7 @@ function _M:export_deflated_reconfigure_payload() local shm_key_name = "clustering:cp_plugins_configured:worker_" .. ngx.worker.id() kong_dict:set(shm_key_name, cjson_encode(self.plugins_configured)) - local service = get_config_service(self) + local service = get_wrpc_service(self) -- yield between steps to prevent long delay local config_json = assert(cjson_encode(config_table)) @@ -181,7 +187,8 @@ function _M:handle_cp_websocket() end -- connection established - local w_peer = wrpc.new_peer(wb, get_config_service(self)) + local w_peer = wrpc.new_peer(wb, get_wrpc_service(self)) + w_peer.id = dp_id local client = { last_seen = ngx_time(), peer = w_peer, @@ -370,7 +377,7 @@ function _M:init_worker(plugins_list) end end, "clustering", "push_config") - ngx.timer.at(0, push_config_loop, self, push_config_semaphore, + timer_at(0, push_config_loop, self, push_config_semaphore, self.conf.db_update_frequency) end diff --git a/kong/clustering/wrpc_data_plane.lua b/kong/clustering/wrpc_data_plane.lua index df4b6f47fb6a..60752e10c6d0 100644 --- a/kong/clustering/wrpc_data_plane.lua +++ b/kong/clustering/wrpc_data_plane.lua @@ -7,6 +7,11 @@ local constants = require("kong.constants") local wrpc_proto = require("kong.tools.wrpc.proto") local cjson = require("cjson.safe") local utils = require("kong.tools.utils") +local negotiation = require("kong.clustering.services.negotiation") +local init_negotiation_client = negotiation.init_negotiation_client +local negotiate = negotiation.negotiate +local get_negotiated_service = negotiation.get_negotiated_service + local assert = assert local setmetatable = setmetatable local tonumber = tonumber @@ -24,9 +29,12 @@ local yield = utils.yield local ngx_ERR = ngx.ERR local ngx_INFO = ngx.INFO +local ngx_NOTICE = ngx.NOTICE local PING_INTERVAL = constants.CLUSTERING_PING_INTERVAL local _log_prefix = "[wrpc-clustering] " local DECLARATIVE_EMPTY_CONFIG_HASH = constants.DECLARATIVE_EMPTY_CONFIG_HASH +local accept_table = { accepted = true } + local _M = { DPCP_CHANNEL_NAME = "DP-CP_config", @@ -44,6 +52,7 @@ function _M.new(conf, cert, cert_key) return setmetatable(self, _MT) end +local communicate function _M:init_worker(plugins_list) -- ROLE = "data_plane" @@ -51,76 +60,75 @@ function _M:init_worker(plugins_list) self.plugins_list = plugins_list if ngx.worker.id() == 0 then - assert(ngx.timer.at(0, function(premature) - self:communicate(premature) - end)) + communicate(self) end end -local wrpc_config_service -local function get_config_service() - if not wrpc_config_service then - wrpc_config_service = wrpc_proto.new() - wrpc_config_service:import("kong.services.config.v1.config") - wrpc_config_service:set_handler("ConfigService.SyncConfig", function(peer, data) - -- yield between steps to prevent long delay - if peer.config_semaphore then - peer.config_obj.next_data = data - if peer.config_semaphore:count() <= 0 then - -- the following line always executes immediately after the `if` check - -- because `:count` will never yield, end result is that the semaphore - -- count is guaranteed to not exceed 1 - peer.config_semaphore:post() - end +local function init_config_service(service) + service:import("kong.services.config.v1.config") + service:set_handler("ConfigService.SyncConfig", function(peer, data) + -- yield between steps to prevent long delay + if peer.config_semaphore then + peer.config_obj.next_data = data + if peer.config_semaphore:count() <= 0 then + -- the following line always executes immediately after the `if` check + -- because `:count` will never yield, end result is that the semaphore + -- count is guaranteed to not exceed 1 + peer.config_semaphore:post() end - return { accepted = true } - end) - end - - return wrpc_config_service + end + return accept_table + end) end -function _M:communicate(premature) - if premature then - -- worker wants to exit - return +local wrpc_services +local function get_services() + if not wrpc_services then + wrpc_services = wrpc_proto.new() + init_negotiation_client(wrpc_services) + init_config_service(wrpc_services) end - local conf = self.conf + return wrpc_services +end + +local function communicate_impl(dp) + local conf = dp.conf local log_suffix = " [" .. conf.cluster_control_plane .. "]" - local reconnection_delay = math.random(5, 10) local c, uri, err = clustering_utils.connect_cp( - "/v1/wrpc", conf, self.cert, self.cert_key, + "/v1/wrpc", conf, dp.cert, dp.cert_key, "wrpc.konghq.com") if not c then - ngx_log(ngx_ERR, _log_prefix, "connection to control plane ", uri, " broken: ", err, - " (retrying after ", reconnection_delay, " seconds)", log_suffix) - - assert(ngx.timer.at(reconnection_delay, function(premature) - self:communicate(premature) - end)) - return + error("connection to control plane " .. uri .." broken: " .. err) end local config_semaphore = semaphore.new(0) - local peer = wrpc.new_peer(c, get_config_service(), { channel = self.DPCP_CHANNEL_NAME }) + local peer = wrpc.new_peer(c, get_services(), { channel = dp.DPCP_CHANNEL_NAME }) peer.config_semaphore = config_semaphore - peer.config_obj = self + peer.config_obj = dp peer:spawn_threads() do - local resp, err = peer:call_async("ConfigService.ReportMetadata", { plugins = self.plugins_list }) + local ok, err = negotiate(peer) + if not ok then + error(err) + end + end + + do + local version, msg = get_negotiated_service("config") + if not version then + error("config sync service not supported: " .. msg) + end + local resp, err = peer:call_async("ConfigService.ReportMetadata", { plugins = dp.plugins_list }) -- if resp is not nil, it must be table if not resp or not resp.ok then - ngx_log(ngx_ERR, _log_prefix, "Couldn't report basic info to CP: ", resp and resp.error or err) - assert(ngx.timer.at(reconnection_delay, function(premature) - self:communicate(premature) - end)) + error("Couldn't report basic info to CP: " .. (resp and resp.error or err)) end end @@ -143,7 +151,7 @@ function _M:communicate(premature) config_semaphore = nil end - local data = self.next_data + local data = dp.next_data if data then local config_version = tonumber(data.version) if config_version > last_config_version then @@ -154,7 +162,7 @@ function _M:communicate(premature) ngx_log(ngx_INFO, _log_prefix, "received config #", config_version, log_suffix) local pok, res - pok, res, err = xpcall(config_helper.update, traceback, self.declarative_config, + pok, res, err = xpcall(config_helper.update, traceback, dp.declarative_config, config_table, data.config_hash, data.hashes) if pok then last_config_version = config_version @@ -166,8 +174,8 @@ function _M:communicate(premature) ngx_log(ngx_ERR, _log_prefix, "unable to update running config: ", res) end - if self.next_data == data then - self.next_data = nil + if dp.next_data == data then + dp.next_data = nil end end end @@ -203,10 +211,10 @@ function _M:communicate(premature) c:close() if not ok then - ngx_log(ngx_ERR, _log_prefix, err, log_suffix) + error(err) elseif perr then - ngx_log(ngx_ERR, _log_prefix, perr, log_suffix) + error(perr) end -- the config thread might be holding a lock if it's in the middle of an @@ -220,11 +228,31 @@ function _M:communicate(premature) elseif perr then ngx_log(ngx_ERR, _log_prefix, perr, log_suffix) end +end + +local communicate_loop + +function communicate(dp, reconnection_delay) + return ngx.timer.at(reconnection_delay or 0, communicate_loop, dp) +end + +function communicate_loop(premature, dp) + if premature then + -- worker wants to exit + return + end + + local ok, err = pcall(communicate_impl, dp) + if not ok then + ngx_log(ngx_ERR, err) + end + + -- retry connection + local reconnection_delay = math.random(5, 10) + ngx_log(ngx_NOTICE, " (retrying after " .. reconnection_delay .. " seconds)") if not exiting() then - assert(ngx.timer.at(reconnection_delay, function(premature) - self:communicate(premature) - end)) + communicate(dp, reconnection_delay) end end diff --git a/kong/include/kong/services/negotiation/v1/negotiation.proto b/kong/include/kong/services/negotiation/v1/negotiation.proto new file mode 100644 index 000000000000..473779eb5aaa --- /dev/null +++ b/kong/include/kong/services/negotiation/v1/negotiation.proto @@ -0,0 +1,53 @@ +syntax = "proto3"; + +option go_package = "github.com/kong/koko/internal/gen/wrpc/kong/model;model"; + +package kong.model; + + +// NegotiationService allows DP to ask for services and CP answer with the accepted version of each +// +wrpc:service-id=5 +service NegotiationService { + // NegotiateServices should be the first RPC call upon connecting. It allows the CP + // to get which services does the DP handle. + // + // Call direction: DP to CP + // +wrpc:rpc-id=4 + rpc NegotiateServices(kong.model.NegotiateServicesRequest) returns (kong.model.NegotiateServicesResponse); +} + +message NegotiateServicesRequest { + DPNodeDescription node = 1; + repeated ServiceRequest services_requested = 2; +} + +message DPNodeDescription { + string type = 2; + string version = 3; + string hostname = 4; +} + +message ServiceRequest { + string name = 1; + repeated string versions = 2; +} + +message NegotiateServicesResponse { + string error_message = 1; + CPNodeDescription node = 2; + repeated AcceptedService services_accepted = 3; + repeated RejectedService services_rejected = 4; +} + +message AcceptedService { + string name = 1; + string version = 2; + string message = 3; +} + +message RejectedService { + string name = 1; + string message = 3; +} + +message CPNodeDescription {} \ No newline at end of file diff --git a/kong/init.lua b/kong/init.lua index 73ad90a72eb6..77e1269a06a2 100644 --- a/kong/init.lua +++ b/kong/init.lua @@ -1548,11 +1548,6 @@ function Kong.serve_wrpc_listener(options) end -function Kong.serve_version_handshake() - return kong.clustering:serve_version_handshake() -end - - function Kong.stream_api() stream_api.handle() end