diff --git a/kong/clustering/services/sync/rpc.lua b/kong/clustering/services/sync/rpc.lua index 0b6970e4c2f7..4d67808f74e0 100644 --- a/kong/clustering/services/sync/rpc.lua +++ b/kong/clustering/services/sync/rpc.lua @@ -8,14 +8,21 @@ local constants = require("kong.constants") local concurrency = require("kong.concurrency") local isempty = require("table.isempty") local events = require("kong.runloop.events") +local lrucache = require("resty.lrucache") +local EMPTY = require("kong.tools.table").EMPTY local insert_entity_for_txn = declarative.insert_entity_for_txn local delete_entity_for_txn = declarative.delete_entity_for_txn local DECLARATIVE_HASH_KEY = constants.DECLARATIVE_HASH_KEY local DECLARATIVE_DEFAULT_WORKSPACE_KEY = constants.DECLARATIVE_DEFAULT_WORKSPACE_KEY +local DECLARATIVE_EMPTY_CONFIG_HASH = constants.DECLARATIVE_EMPTY_CONFIG_HASH local CLUSTERING_SYNC_STATUS = constants.CLUSTERING_SYNC_STATUS local SYNC_MUTEX_OPTS = { name = "get_delta", timeout = 0, } +local encode_base64 = ngx.encode_base64 +local decode_base64 = ngx.decode_base64 +local cjson_encode = require("cjson.safe").encode + local assert = assert @@ -25,6 +32,7 @@ local ngx_null = ngx.null local ngx_log = ngx.log local ngx_ERR = ngx.ERR local ngx_INFO = ngx.INFO +local ngx_NOTICE = ngx.NOTICE local ngx_DEBUG = ngx.DEBUG @@ -32,9 +40,49 @@ local ngx_DEBUG = ngx.DEBUG local FULL_SYNC_THRESHOLD = 512 -function _M.new(strategy) +local function decode_pagination_status(pagination_status) + if not pagination_status then + return nil, nil + end + + -- base64 encoded json + local decoded = decode_base64(pagination_status) + if not decoded then + return nil, "failed to base64 decode pagination status:" .. err + end + + decoded, err = cjson_decode(decoded) + if not decoded then + return nil, "failed to cjson decode pagination status:" .. err + end + + return decoded.version, decoded.page_size, decoded.next_page +end + + + +local function encode_pagination_status(version, page_size, next_page) + local data = { + version = version, + page_size = page_size, + next_page = next_page, + } + + local json, err = cjson_encode(data) + if not json then + return nil, "failed to encode pagination:" .. err + end + + return encode_base64(json) +end + + +function _M.new(strategy, opts) + opts = opts or EMPTY + local self = { strategy = strategy, + page_size = opts.page_size, } return setmetatable(self, _MT) @@ -46,20 +94,78 @@ local function inc_sync_result(res) end -local function full_sync_result() - local deltas, err = declarative.export_config_sync() - if not deltas then - return nil, err +function _M:full_sync_result(full_sync_status) + local target_version, page_size, next_page = decode_pagination_status(full_sync_status) + + -- page_size mean err here + if (not target_version) and page_size then + return nil, "communication error: " .. page_size + end + + -- try fetch from cache + local config_deltas + if target_version then + config_deltas = self.full_sync_cache:get(target_version) + -- DP tries to fetch unknown version or cache expired/evicted/missed + -- we consider it the first time full sync call + if not config_deltas then + ngx_log(ngx_NOTICE, "full sync cache miss for version: ", target_version) + end + end + + -- first time full sync call, need wipe and begin the full sync session + local first_time = not config_deltas + if first_time then + -- set the target_version for the first time + config_deltas, target_version = declarative.export_config_sync() + if not config_deltas then + return nil, target_version + end + + local ok, err = self.full_sync_cache:set(target_version, config_deltas) + if not ok then + return "failed to cache full sync deltas: " .. err + end + end + + local begin = next_page or 1 + page_size = page_size or self.page_size + next_page = begin + page_size + + -- at this point, + -- config_deltas, target_version, page_size, next_page are all guaranteed to be non-nil + + -- no more deltas. end the session for DP + local last_time = next_page > #config_deltas + + -- get the deltas for the current page + local deltas, n = {}, 1 + for i = begin, next_page - 1 do + local delta = config_deltas[i] + if not delta then + break + end + + deltas[n] = delta + n = n + 1 + end + + -- TODO: handle new deltas those which happen during the full sync + + local full_sync_status + if not last_time then + full_sync_status = encode_pagination_status(target_version, page_size, next_page) end - -- wipe dp lmdb, full sync - return { default = { deltas = deltas, wipe = true, }, } + return { default = { deltas = deltas, wipe = first_time, full_sync_done = last_time, full_sync_status = full_sync_status }, } end function _M:init_cp(manager) local purge_delay = manager.conf.cluster_data_plane_purge_delay + self.full_sync_cache = lrucache.new(10) + -- CP -- Method: kong.sync.v2.get_delta -- Params: versions: list of current versions of the database @@ -103,7 +209,8 @@ function _M:init_cp(manager) -- is the node empty? If so, just do a full sync to bring it up to date faster if default_namespace_version == 0 or - latest_version - default_namespace_version > FULL_SYNC_THRESHOLD + latest_version - default_namespace_version > FULL_SYNC_THRESHOLD or + default_namespace.full_sync_status then -- we need to full sync because holes are found @@ -112,7 +219,7 @@ function _M:init_cp(manager) ", current_version: ", default_namespace_version, ", forcing a full sync") - return full_sync_result() + return self:full_sync_result(default_namespace.full_sync_status) end -- do we need an incremental sync? @@ -199,14 +306,19 @@ local function is_rpc_ready() end -local function do_sync() +local function do_sync(dp_status) if not is_rpc_ready() then return nil, "rpc is not ready" end + -- when in a partial sync, even if a update notification triggers a sync, it will + -- be blocked by the mutex, and it will continue to do the rest of the sync + local in_full_sync = dp_status.full_sync_status + local msg = { default = { version = tonumber(declarative.get_current_hash()) or 0, + full_sync_status = in_full_sync, }, } @@ -217,7 +329,7 @@ local function do_sync() end -- ns_deltas should look like: - -- { default = { deltas = { ... }, wipe = true, }, } + -- { default = { deltas = { ... }, wipe = true, full_sync_done = false, full_sync_status = ...}, } local ns_delta = ns_deltas.default if not ns_delta then @@ -231,6 +343,11 @@ local function do_sync() return true end + if ns_delta.full_sync_status then + -- full sync is in progress + in_full_sync = true + end + -- we should find the correct default workspace -- and replace the old one with it local default_ws_changed @@ -247,9 +364,15 @@ local function do_sync() local t = txn.begin(512) + -- begining of the full sync session, wipe the lmdb and purge the cache local wipe = ns_delta.wipe if wipe then t:db_drop(false) + kong.core_cache:purge() + kong.cache:purge() + -- we are at a unready state + -- consider the config empty + t:set(DECLARATIVE_HASH_KEY, DECLARATIVE_EMPTY_CONFIG_HASH) end local db = kong.db @@ -298,8 +421,8 @@ local function do_sync() ", version: ", delta_version, ", type: ", delta_type) - -- wipe the whole lmdb, should not have events - if not wipe then + -- during the full sync, should not emit events + if not in_full_sync then ev = { delta_type, old_entity and "update" or "create", delta_entity, old_entity, } end @@ -310,8 +433,8 @@ local function do_sync() return nil, err end - -- If we will wipe lmdb, we don't need to delete it from lmdb. - if old_entity and not wipe then + -- during the full sync, should not emit events + if old_entity and not in_full_sync then local res, err = delete_entity_for_txn(t, delta_type, old_entity, opts) if not res then return nil, err @@ -323,14 +446,14 @@ local function do_sync() ", version: ", delta_version, ", type: ", delta_type) - -- wipe the whole lmdb, should not have events - if not wipe then + -- delete the entity, opts for getting correct lmdb key + if not in_full_sync then ev = { delta_type, "delete", old_entity, } end end -- if delta_entity ~= nil and delta_entity ~= ngx_null - -- wipe the whole lmdb, should not have events - if not wipe then + -- during the full sync, should not emit events + if not in_full_sync then crud_events_n = crud_events_n + 1 crud_events[crud_events_n] = ev end @@ -343,9 +466,12 @@ local function do_sync() end end -- for _, delta - -- store current sync version - t:set(DECLARATIVE_HASH_KEY, fmt("%032d", version)) - + -- only update the sync version if not in full sync/ full sync done + if (not in_full_sync) or ns_delta.full_sync_done then + -- store current sync version + t:set(DECLARATIVE_HASH_KEY, fmt("%032d", version)) + end + -- store the correct default workspace uuid if default_ws_changed then t:set(DECLARATIVE_DEFAULT_WORKSPACE_KEY, kong.default_workspace) @@ -356,32 +482,32 @@ local function do_sync() return nil, err end - if wipe then - kong.core_cache:purge() - kong.cache:purge() + dp_status.full_sync_status = ns_delta.full_sync_status + -- the full sync is done + if ns_delta.full_sync_done then -- Trigger other workers' callbacks like reconfigure_handler. -- -- Full sync could rebuild route, plugins and balancer route, so their -- hashes are nil. + -- Until this point, the dataplane is not ready to serve requests or to + -- do delta syncs. local reconfigure_data = { kong.default_workspace, nil, nil, nil, } - local ok, err = events.declarative_reconfigure_notify(reconfigure_data) - if not ok then - return nil, err - end + return events.declarative_reconfigure_notify(reconfigure_data) + end - else - for _, event in ipairs(crud_events) do - -- delta_type, crud_event_type, delta.entity, old_entity - db[event[1]]:post_crud_event(event[2], event[3], event[4]) - end + -- emit the CRUD events + -- if in_full_sync, no events should be added into the queue + for _, event in ipairs(crud_events) do + -- delta_type, crud_event_type, delta.entity, old_entity + db[event[1]]:post_crud_event(event[2], event[3], event[4]) end return true end -local function sync_handler(premature) +local function sync_handler(premature, dp_status) if premature then return end @@ -391,7 +517,7 @@ local function sync_handler(premature) -- to CP quickly after sync. (`kong.sync.v2.get_delta` is used for both pulling delta -- as well as status reporting) for _ = 1, 2 do - local ok, err = do_sync() + local ok, err = do_sync(dp_status) if not ok then return nil, err end @@ -402,11 +528,17 @@ local function sync_handler(premature) if not res and err ~= "timeout" then ngx_log(ngx_ERR, "unable to create worker mutex and sync: ", err) end + + if dp_status.full_sync_status then + -- full sync is in progress + -- continue to sync + return _M:sync_once() + end end -local function start_sync_timer(timer_func, delay) - local hdl, err = timer_func(delay, sync_handler) +local function start_sync_timer(timer_func, delay, dp_status) + local hdl, err = timer_func(delay, sync_handler, dp_status) if not hdl then return nil, err @@ -417,12 +549,12 @@ end function _M:sync_once(delay) - return start_sync_timer(ngx.timer.at, delay or 0) + return start_sync_timer(ngx.timer.at, delay or 0, self) end function _M:sync_every(delay) - return start_sync_timer(ngx.timer.every, delay) + return start_sync_timer(ngx.timer.every, delay, self) end diff --git a/kong/db/declarative/export.lua b/kong/db/declarative/export.lua index 9dbe994a42c7..47b13ef78729 100644 --- a/kong/db/declarative/export.lua +++ b/kong/db/declarative/export.lua @@ -134,15 +134,16 @@ local function export_from_db_impl(emitter, skip_ws, skip_disabled_entities, exp return nil, err end - local sync_version + local sync_version = get_latest_version() + local meta_sync_version if emitter.want_sync_version then - sync_version = get_latest_version() + meta_sync_version = sync_version end emitter:emit_toplevel({ _format_version = "3.0", _transform = false, - _sync_version = sync_version, -- only used by sync emitter, DP doesn't care about this + _sync_version = meta_sync_version, -- only used by sync emitter, DP doesn't care about this }) local disabled_services = {} @@ -210,7 +211,7 @@ local function export_from_db_impl(emitter, skip_ws, skip_disabled_entities, exp end_transaction(db) - return emitter:done() + return emitter:done(), sync_version end