Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sync): full sync pagination #13940

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
212 changes: 172 additions & 40 deletions kong/clustering/services/sync/rpc.lua
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,21 @@ local constants = require("kong.constants")
local concurrency = require("kong.concurrency")
local isempty = require("table.isempty")
local events = require("kong.runloop.events")
local lrucache = require("resty.lrucache")


local EMPTY = require("kong.tools.table").EMPTY
local insert_entity_for_txn = declarative.insert_entity_for_txn
local delete_entity_for_txn = declarative.delete_entity_for_txn
local DECLARATIVE_HASH_KEY = constants.DECLARATIVE_HASH_KEY
local DECLARATIVE_DEFAULT_WORKSPACE_KEY = constants.DECLARATIVE_DEFAULT_WORKSPACE_KEY
local DECLARATIVE_EMPTY_CONFIG_HASH = constants.DECLARATIVE_EMPTY_CONFIG_HASH
local CLUSTERING_SYNC_STATUS = constants.CLUSTERING_SYNC_STATUS
local SYNC_MUTEX_OPTS = { name = "get_delta", timeout = 0, }
local encode_base64 = ngx.encode_base64
local decode_base64 = ngx.decode_base64
local cjson_encode = require("cjson.safe").encode



local assert = assert
Expand All @@ -25,16 +32,57 @@ local ngx_null = ngx.null
local ngx_log = ngx.log
local ngx_ERR = ngx.ERR
local ngx_INFO = ngx.INFO
local ngx_NOTICE = ngx.NOTICE
local ngx_DEBUG = ngx.DEBUG


-- number of versions behind before a full sync is forced
local FULL_SYNC_THRESHOLD = 512


function _M.new(strategy)
local function decode_pagination_status(pagination_status)
if not pagination_status then
return nil, nil
end

-- base64 encoded json
local decoded = decode_base64(pagination_status)
if not decoded then
return nil, "failed to base64 decode pagination status:" .. err
end

decoded, err = cjson_decode(decoded)
if not decoded then
return nil, "failed to cjson decode pagination status:" .. err
end

return decoded.version, decoded.page_size, decoded.next_page
end



local function encode_pagination_status(version, page_size, next_page)
local data = {
version = version,
page_size = page_size,
next_page = next_page,
}

local json, err = cjson_encode(data)
if not json then
return nil, "failed to encode pagination:" .. err
end

return encode_base64(json)
end


function _M.new(strategy, opts)
opts = opts or EMPTY

local self = {
strategy = strategy,
page_size = opts.page_size,
}

return setmetatable(self, _MT)
Expand All @@ -46,20 +94,78 @@ local function inc_sync_result(res)
end


local function full_sync_result()
local deltas, err = declarative.export_config_sync()
if not deltas then
return nil, err
function _M:full_sync_result(full_sync_status)
local target_version, page_size, next_page = decode_pagination_status(full_sync_status)

-- page_size mean err here
if (not target_version) and page_size then
return nil, "communication error: " .. page_size
end

-- try fetch from cache
local config_deltas
if target_version then
config_deltas = self.full_sync_cache:get(target_version)
-- DP tries to fetch unknown version or cache expired/evicted/missed
-- we consider it the first time full sync call
if not config_deltas then
ngx_log(ngx_NOTICE, "full sync cache miss for version: ", target_version)
end
end

-- first time full sync call, need wipe and begin the full sync session
local first_time = not config_deltas
if first_time then
-- set the target_version for the first time
config_deltas, target_version = declarative.export_config_sync()
if not config_deltas then
return nil, target_version
end

local ok, err = self.full_sync_cache:set(target_version, config_deltas)
if not ok then
return "failed to cache full sync deltas: " .. err
end
end

local begin = next_page or 1
page_size = page_size or self.page_size
next_page = begin + page_size

-- at this point,
-- config_deltas, target_version, page_size, next_page are all guaranteed to be non-nil

-- no more deltas. end the session for DP
local last_time = next_page > #config_deltas

-- get the deltas for the current page
local deltas, n = {}, 1
for i = begin, next_page - 1 do
local delta = config_deltas[i]
if not delta then
break
end

deltas[n] = delta
n = n + 1
end

-- TODO: handle new deltas those which happen during the full sync

local full_sync_status
if not last_time then
full_sync_status = encode_pagination_status(target_version, page_size, next_page)
end

-- wipe dp lmdb, full sync
return { default = { deltas = deltas, wipe = true, }, }
return { default = { deltas = deltas, wipe = first_time, full_sync_done = last_time, full_sync_status = full_sync_status }, }
end


function _M:init_cp(manager)
local purge_delay = manager.conf.cluster_data_plane_purge_delay

self.full_sync_cache = lrucache.new(10)

-- CP
-- Method: kong.sync.v2.get_delta
-- Params: versions: list of current versions of the database
Expand Down Expand Up @@ -103,7 +209,8 @@ function _M:init_cp(manager)

-- is the node empty? If so, just do a full sync to bring it up to date faster
if default_namespace_version == 0 or
latest_version - default_namespace_version > FULL_SYNC_THRESHOLD
latest_version - default_namespace_version > FULL_SYNC_THRESHOLD or
default_namespace.full_sync_status
then
-- we need to full sync because holes are found

Expand All @@ -112,7 +219,7 @@ function _M:init_cp(manager)
", current_version: ", default_namespace_version,
", forcing a full sync")

return full_sync_result()
return self:full_sync_result(default_namespace.full_sync_status)
end

-- do we need an incremental sync?
Expand Down Expand Up @@ -199,14 +306,19 @@ local function is_rpc_ready()
end


local function do_sync()
local function do_sync(dp_status)
if not is_rpc_ready() then
return nil, "rpc is not ready"
end

-- when in a partial sync, even if a update notification triggers a sync, it will
-- be blocked by the mutex, and it will continue to do the rest of the sync
local in_full_sync = dp_status.full_sync_status

local msg = { default =
{ version =
tonumber(declarative.get_current_hash()) or 0,
full_sync_status = in_full_sync,
},
}

Expand All @@ -217,7 +329,7 @@ local function do_sync()
end

-- ns_deltas should look like:
-- { default = { deltas = { ... }, wipe = true, }, }
-- { default = { deltas = { ... }, wipe = true, full_sync_done = false, full_sync_status = ...}, }

local ns_delta = ns_deltas.default
if not ns_delta then
Expand All @@ -231,6 +343,11 @@ local function do_sync()
return true
end

if ns_delta.full_sync_status then
-- full sync is in progress
in_full_sync = true
end

-- we should find the correct default workspace
-- and replace the old one with it
local default_ws_changed
Expand All @@ -247,9 +364,15 @@ local function do_sync()

local t = txn.begin(512)

-- begining of the full sync session, wipe the lmdb and purge the cache
local wipe = ns_delta.wipe
if wipe then
t:db_drop(false)
kong.core_cache:purge()
kong.cache:purge()
-- we are at a unready state
-- consider the config empty
t:set(DECLARATIVE_HASH_KEY, DECLARATIVE_EMPTY_CONFIG_HASH)
end

local db = kong.db
Expand Down Expand Up @@ -298,8 +421,8 @@ local function do_sync()
", version: ", delta_version,
", type: ", delta_type)

-- wipe the whole lmdb, should not have events
if not wipe then
-- during the full sync, should not emit events
if not in_full_sync then
ev = { delta_type, old_entity and "update" or "create", delta_entity, old_entity, }
end

Expand All @@ -310,8 +433,8 @@ local function do_sync()
return nil, err
end

-- If we will wipe lmdb, we don't need to delete it from lmdb.
if old_entity and not wipe then
-- during the full sync, should not emit events
if old_entity and not in_full_sync then
local res, err = delete_entity_for_txn(t, delta_type, old_entity, opts)
if not res then
return nil, err
Expand All @@ -323,14 +446,14 @@ local function do_sync()
", version: ", delta_version,
", type: ", delta_type)

-- wipe the whole lmdb, should not have events
if not wipe then
-- delete the entity, opts for getting correct lmdb key
if not in_full_sync then
ev = { delta_type, "delete", old_entity, }
end
end -- if delta_entity ~= nil and delta_entity ~= ngx_null

-- wipe the whole lmdb, should not have events
if not wipe then
-- during the full sync, should not emit events
if not in_full_sync then
crud_events_n = crud_events_n + 1
crud_events[crud_events_n] = ev
end
Expand All @@ -343,9 +466,12 @@ local function do_sync()
end
end -- for _, delta

-- store current sync version
t:set(DECLARATIVE_HASH_KEY, fmt("%032d", version))

-- only update the sync version if not in full sync/ full sync done
if (not in_full_sync) or ns_delta.full_sync_done then
-- store current sync version
t:set(DECLARATIVE_HASH_KEY, fmt("%032d", version))
end

-- store the correct default workspace uuid
if default_ws_changed then
t:set(DECLARATIVE_DEFAULT_WORKSPACE_KEY, kong.default_workspace)
Expand All @@ -356,32 +482,32 @@ local function do_sync()
return nil, err
end

if wipe then
kong.core_cache:purge()
kong.cache:purge()
dp_status.full_sync_status = ns_delta.full_sync_status

-- the full sync is done
if ns_delta.full_sync_done then
-- Trigger other workers' callbacks like reconfigure_handler.
--
-- Full sync could rebuild route, plugins and balancer route, so their
-- hashes are nil.
-- Until this point, the dataplane is not ready to serve requests or to
-- do delta syncs.
local reconfigure_data = { kong.default_workspace, nil, nil, nil, }
local ok, err = events.declarative_reconfigure_notify(reconfigure_data)
if not ok then
return nil, err
end
return events.declarative_reconfigure_notify(reconfigure_data)
end

else
for _, event in ipairs(crud_events) do
-- delta_type, crud_event_type, delta.entity, old_entity
db[event[1]]:post_crud_event(event[2], event[3], event[4])
end
-- emit the CRUD events
-- if in_full_sync, no events should be added into the queue
for _, event in ipairs(crud_events) do
-- delta_type, crud_event_type, delta.entity, old_entity
db[event[1]]:post_crud_event(event[2], event[3], event[4])
end

return true
end


local function sync_handler(premature)
local function sync_handler(premature, dp_status)
if premature then
return
end
Expand All @@ -391,7 +517,7 @@ local function sync_handler(premature)
-- to CP quickly after sync. (`kong.sync.v2.get_delta` is used for both pulling delta
-- as well as status reporting)
for _ = 1, 2 do
local ok, err = do_sync()
local ok, err = do_sync(dp_status)
if not ok then
return nil, err
end
Expand All @@ -402,11 +528,17 @@ local function sync_handler(premature)
if not res and err ~= "timeout" then
ngx_log(ngx_ERR, "unable to create worker mutex and sync: ", err)
end

if dp_status.full_sync_status then
-- full sync is in progress
-- continue to sync
return _M:sync_once()
end
end


local function start_sync_timer(timer_func, delay)
local hdl, err = timer_func(delay, sync_handler)
local function start_sync_timer(timer_func, delay, dp_status)
local hdl, err = timer_func(delay, sync_handler, dp_status)

if not hdl then
return nil, err
Expand All @@ -417,12 +549,12 @@ end


function _M:sync_once(delay)
return start_sync_timer(ngx.timer.at, delay or 0)
return start_sync_timer(ngx.timer.at, delay or 0, self)
end


function _M:sync_every(delay)
return start_sync_timer(ngx.timer.every, delay)
return start_sync_timer(ngx.timer.every, delay, self)
end


Expand Down
Loading
Loading