From 2058a0669dc049a7ec5e720f3e8c5eaba019eeb4 Mon Sep 17 00:00:00 2001 From: Georgy Moiseev Date: Thu, 18 Aug 2022 15:12:30 +0300 Subject: [PATCH] internal: rework sharding schema reload This patch is the groundwork for vshard groups and custom routers support. After this patch, sharding schema reload works per vshard router object. Test runs have shown that this patch do not affects the performance of crud requests. Part of #44 --- crud/common/sharding/init.lua | 6 +- .../common/sharding/router_metadata_cache.lua | 50 +++++++++++---- crud/common/sharding/sharding_func.lua | 13 ++-- crud/common/sharding/sharding_key.lua | 21 ++++--- crud/common/sharding/sharding_metadata.lua | 63 +++++++++++-------- crud/count.lua | 2 +- crud/delete.lua | 2 +- crud/get.lua | 2 +- crud/select/compat/select.lua | 2 +- crud/select/compat/select_old.lua | 2 +- crud/update.lua | 2 +- test/helper.lua | 12 +++- test/unit/sharding_metadata_test.lua | 10 +-- 13 files changed, 122 insertions(+), 65 deletions(-) diff --git a/crud/common/sharding/init.lua b/crud/common/sharding/init.lua index aaf79015..200dd12b 100644 --- a/crud/common/sharding/init.lua +++ b/crud/common/sharding/init.lua @@ -32,7 +32,8 @@ function sharding.key_get_bucket_id(space_name, key, specified_bucket_id) return { bucket_id = specified_bucket_id } end - local sharding_func_data, err = sharding_metadata_module.fetch_sharding_func_on_router(space_name) + local vshard_router = vshard.router.static + local sharding_func_data, err = sharding_metadata_module.fetch_sharding_func_on_router(vshard_router, space_name) if err ~= nil then return nil, err end @@ -53,7 +54,8 @@ function sharding.tuple_get_bucket_id(tuple, space, specified_bucket_id) end local sharding_index_parts = space.index[0].parts - local sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(space.name) + local vshard_router = vshard.router.static + local sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(vshard_router, space.name) if err ~= nil then return nil, err end diff --git a/crud/common/sharding/router_metadata_cache.lua b/crud/common/sharding/router_metadata_cache.lua index a6496f00..8da5fd20 100644 --- a/crud/common/sharding/router_metadata_cache.lua +++ b/crud/common/sharding/router_metadata_cache.lua @@ -5,21 +5,47 @@ local router_metadata_cache = {} router_metadata_cache.SHARDING_KEY_MAP_NAME = "sharding_key_as_index_obj_map" router_metadata_cache.SHARDING_FUNC_MAP_NAME = "sharding_func_map" router_metadata_cache.META_HASH_MAP_NAME = "sharding_meta_hash_map" -router_metadata_cache[router_metadata_cache.SHARDING_KEY_MAP_NAME] = nil -router_metadata_cache[router_metadata_cache.SHARDING_FUNC_MAP_NAME] = nil -router_metadata_cache[router_metadata_cache.META_HASH_MAP_NAME] = {} -router_metadata_cache.fetch_lock = fiber.channel(1) -router_metadata_cache.is_part_of_pk = {} + +local internal_storage = {} + +function router_metadata_cache.get_instance(vshard_router) + local name = vshard_router.name + + if internal_storage[name] ~= nil then + return internal_storage[name] + end + + internal_storage[name] = { + [router_metadata_cache.SHARDING_KEY_MAP_NAME] = nil, + [router_metadata_cache.SHARDING_FUNC_MAP_NAME] = nil, + [router_metadata_cache.META_HASH_MAP_NAME] = {}, + fetch_lock = fiber.channel(1), + is_part_of_pk = {} + } + + return internal_storage[name] +end + +function router_metadata_cache.drop_instance(vshard_router) + local name = vshard_router.name + + if internal_storage[name] == nil then + return + end + + if internal_storage[name].fetch_lock ~= nil then + internal_storage[name].fetch_lock:close() + end + + internal_storage[name] = nil +end function router_metadata_cache.drop_caches() - router_metadata_cache[router_metadata_cache.SHARDING_KEY_MAP_NAME] = nil - router_metadata_cache[router_metadata_cache.SHARDING_FUNC_MAP_NAME] = nil - router_metadata_cache[router_metadata_cache.META_HASH_MAP_NAME] = {} - if router_metadata_cache.fetch_lock ~= nil then - router_metadata_cache.fetch_lock:close() + for name, _ in pairs(internal_storage) do + router_metadata_cache.drop_instance(name) end - router_metadata_cache.fetch_lock = fiber.channel(1) - router_metadata_cache.is_part_of_pk = {} + + internal_storage = {} end return router_metadata_cache \ No newline at end of file diff --git a/crud/common/sharding/sharding_func.lua b/crud/common/sharding/sharding_func.lua index 82a6cf4c..9fc7caca 100644 --- a/crud/common/sharding/sharding_func.lua +++ b/crud/common/sharding/sharding_func.lua @@ -1,8 +1,9 @@ local errors = require('errors') local log = require('log') +local vshard = require('vshard') local dev_checks = require('crud.common.dev_checks') -local cache = require('crud.common.sharding.router_metadata_cache') +local router_cache = require('crud.common.sharding.router_metadata_cache') local utils = require('crud.common.utils') local ShardingFuncError = errors.new_class('ShardingFuncError', {capture_stack = false}) @@ -106,11 +107,13 @@ function sharding_func_module.construct_as_callable_obj_cache(metadata_map, spec local result_err - cache[cache.SHARDING_FUNC_MAP_NAME] = {} - local func_cache = cache[cache.SHARDING_FUNC_MAP_NAME] + local vshard_router = vshard.router.static + local cache = router_cache.get_instance(vshard_router) + cache[router_cache.SHARDING_FUNC_MAP_NAME] = {} + local func_cache = cache[router_cache.SHARDING_FUNC_MAP_NAME] - cache[cache.META_HASH_MAP_NAME][cache.SHARDING_FUNC_MAP_NAME] = {} - local func_hash_cache = cache[cache.META_HASH_MAP_NAME][cache.SHARDING_FUNC_MAP_NAME] + cache[router_cache.META_HASH_MAP_NAME][router_cache.SHARDING_FUNC_MAP_NAME] = {} + local func_hash_cache = cache[router_cache.META_HASH_MAP_NAME][router_cache.SHARDING_FUNC_MAP_NAME] for space_name, metadata in pairs(metadata_map) do if metadata.sharding_func_def ~= nil then diff --git a/crud/common/sharding/sharding_key.lua b/crud/common/sharding/sharding_key.lua index 635c3e34..800c1f3f 100644 --- a/crud/common/sharding/sharding_key.lua +++ b/crud/common/sharding/sharding_key.lua @@ -1,8 +1,9 @@ local errors = require('errors') local log = require('log') +local vshard = require('vshard') local dev_checks = require('crud.common.dev_checks') -local cache = require('crud.common.sharding.router_metadata_cache') +local router_cache = require('crud.common.sharding.router_metadata_cache') local utils = require('crud.common.utils') local ShardingKeyError = errors.new_class("ShardingKeyError", {capture_stack = false}) @@ -30,8 +31,8 @@ local function as_index_object(space_name, space_format, sharding_key_def) end -- Make sure sharding key definition is a part of primary key. -local function is_part_of_pk(space_name, primary_index_parts, sharding_key_as_index_obj) - dev_checks('string', 'table', 'table') +local function is_part_of_pk(cache, space_name, primary_index_parts, sharding_key_as_index_obj) + dev_checks('table', 'string', 'table', 'table') if cache.is_part_of_pk[space_name] ~= nil then return cache.is_part_of_pk[space_name] @@ -83,7 +84,9 @@ function sharding_key_module.extract_from_pk(space_name, sharding_key_as_index_o return primary_key end - local res = is_part_of_pk(space_name, primary_index_parts, sharding_key_as_index_obj) + local vshard_router = vshard.router.static + local cache = router_cache.get_instance(vshard_router) + local res = is_part_of_pk(cache, space_name, primary_index_parts, sharding_key_as_index_obj) if res == false then return nil, ShardingKeyError:new( "Sharding key for space %q is missed in primary index, specify bucket_id", @@ -102,11 +105,13 @@ function sharding_key_module.construct_as_index_obj_cache(metadata_map, specifie local result_err - cache[cache.SHARDING_KEY_MAP_NAME] = {} - local key_cache = cache[cache.SHARDING_KEY_MAP_NAME] + local vshard_router = vshard.router.static + local cache = router_cache.get_instance(vshard_router) + cache[router_cache.SHARDING_KEY_MAP_NAME] = {} + local key_cache = cache[router_cache.SHARDING_KEY_MAP_NAME] - cache[cache.META_HASH_MAP_NAME][cache.SHARDING_KEY_MAP_NAME] = {} - local key_hash_cache = cache[cache.META_HASH_MAP_NAME][cache.SHARDING_KEY_MAP_NAME] + cache[router_cache.META_HASH_MAP_NAME][router_cache.SHARDING_KEY_MAP_NAME] = {} + local key_hash_cache = cache[router_cache.META_HASH_MAP_NAME][router_cache.SHARDING_KEY_MAP_NAME] for space_name, metadata in pairs(metadata_map) do if metadata.sharding_key_def ~= nil then diff --git a/crud/common/sharding/sharding_metadata.lua b/crud/common/sharding/sharding_metadata.lua index 7df5d0d6..3c3e9ff8 100644 --- a/crud/common/sharding/sharding_metadata.lua +++ b/crud/common/sharding/sharding_metadata.lua @@ -1,11 +1,12 @@ local fiber = require('fiber') local errors = require('errors') local log = require('log') +local vshard = require('vshard') local call = require('crud.common.call') local const = require('crud.common.const') local dev_checks = require('crud.common.dev_checks') -local cache = require('crud.common.sharding.router_metadata_cache') +local router_cache = require('crud.common.sharding.router_metadata_cache') local storage_cache = require('crud.common.sharding.storage_metadata_cache') local sharding_func = require('crud.common.sharding.sharding_func') local sharding_key = require('crud.common.sharding.sharding_key') @@ -22,8 +23,11 @@ local sharding_metadata_module = {} local function locked(f) dev_checks('function') - return function(timeout, ...) + return function(timeout, vshard_router, ...) local timeout_deadline = fiber.clock() + timeout + + local cache = router_cache.get_instance(vshard_router) + local ok = cache.fetch_lock:put(true, timeout) -- channel:put() returns false in two cases: when timeout is exceeded -- or channel has been closed. However error message describes only @@ -34,7 +38,7 @@ local function locked(f) "Timeout for fetching sharding metadata is exceeded") end local timeout = timeout_deadline - fiber.clock() - local status, err = pcall(f, timeout, ...) + local status, err = pcall(f, timeout, vshard_router, ...) cache.fetch_lock:get() if not status or err ~= nil then return err @@ -95,8 +99,10 @@ end -- cache.fetch_lock become unlocked during timeout passed to -- _fetch_on_router(). -- metadata_map_name == nil means forced reload. -local _fetch_on_router = locked(function(timeout, space_name, metadata_map_name) - dev_checks('number', 'string', '?string') +local _fetch_on_router = locked(function(timeout, vshard_router, space_name, metadata_map_name) + dev_checks('number', 'table', 'string', '?string') + + local cache = router_cache.get_instance(vshard_router) if (metadata_map_name ~= nil) and (cache[metadata_map_name]) ~= nil then return @@ -109,11 +115,11 @@ local _fetch_on_router = locked(function(timeout, space_name, metadata_map_name) return err end if metadata_map == nil then - cache[cache.SHARDING_KEY_MAP_NAME] = {} - cache[cache.SHARDING_FUNC_MAP_NAME] = {} - cache[cache.META_HASH_MAP_NAME] = { - [cache.SHARDING_KEY_MAP_NAME] = {}, - [cache.SHARDING_FUNC_MAP_NAME] = {}, + cache[router_cache.SHARDING_KEY_MAP_NAME] = {} + cache[router_cache.SHARDING_FUNC_MAP_NAME] = {} + cache[router_cache.META_HASH_MAP_NAME] = { + [router_cache.SHARDING_KEY_MAP_NAME] = {}, + [router_cache.SHARDING_FUNC_MAP_NAME] = {}, } return end @@ -129,16 +135,18 @@ local _fetch_on_router = locked(function(timeout, space_name, metadata_map_name) end end) -local function fetch_on_router(space_name, metadata_map_name, timeout) +local function fetch_on_router(vshard_router, space_name, metadata_map_name, timeout) + local cache = router_cache.get_instance(vshard_router) + if cache[metadata_map_name] ~= nil then return { value = cache[metadata_map_name][space_name], - hash = cache[cache.META_HASH_MAP_NAME][metadata_map_name][space_name] + hash = cache[router_cache.META_HASH_MAP_NAME][metadata_map_name][space_name] } end local timeout = timeout or const.FETCH_SHARDING_METADATA_TIMEOUT - local err = _fetch_on_router(timeout, space_name, metadata_map_name) + local err = _fetch_on_router(timeout, vshard_router, space_name, metadata_map_name) if err ~= nil then return nil, err end @@ -146,7 +154,7 @@ local function fetch_on_router(space_name, metadata_map_name, timeout) if cache[metadata_map_name] ~= nil then return { value = cache[metadata_map_name][space_name], - hash = cache[cache.META_HASH_MAP_NAME][metadata_map_name][space_name], + hash = cache[router_cache.META_HASH_MAP_NAME][metadata_map_name][space_name], } end @@ -163,10 +171,10 @@ end -- that nil without error is a successfull return value. -- - nil and error, when something goes wrong on fetching attempt. -- -function sharding_metadata_module.fetch_sharding_key_on_router(space_name, timeout) - dev_checks('string', '?number') +function sharding_metadata_module.fetch_sharding_key_on_router(vshard_router, space_name, timeout) + dev_checks('table', 'string', '?number') - return fetch_on_router(space_name, cache.SHARDING_KEY_MAP_NAME, timeout) + return fetch_on_router(vshard_router, space_name, router_cache.SHARDING_KEY_MAP_NAME, timeout) end -- Get sharding func for a certain space. @@ -178,28 +186,31 @@ end -- that nil without error is a successfull return value. -- - nil and error, when something goes wrong on fetching attempt. -- -function sharding_metadata_module.fetch_sharding_func_on_router(space_name, timeout) - dev_checks('string', '?number') +function sharding_metadata_module.fetch_sharding_func_on_router(vshard_router, space_name, timeout) + dev_checks('table', 'string', '?number') - return fetch_on_router(space_name, cache.SHARDING_FUNC_MAP_NAME, timeout) + return fetch_on_router(vshard_router, space_name, router_cache.SHARDING_FUNC_MAP_NAME, timeout) end function sharding_metadata_module.update_sharding_key_cache(space_name) - cache.drop_caches() + local vshard_router = vshard.router.static + router_cache.drop_instance(vshard_router) - return sharding_metadata_module.fetch_sharding_key_on_router(space_name) + return sharding_metadata_module.fetch_sharding_key_on_router(vshard_router, space_name) end function sharding_metadata_module.update_sharding_func_cache(space_name) - cache.drop_caches() + local vshard_router = vshard.router.static + router_cache.drop_instance(vshard_router) - return sharding_metadata_module.fetch_sharding_func_on_router(space_name) + return sharding_metadata_module.fetch_sharding_func_on_router(vshard_router, space_name) end function sharding_metadata_module.reload_sharding_cache(space_name) - cache.drop_caches() + local vshard_router = vshard.router.static + router_cache.drop_instance(vshard_router) - local err = _fetch_on_router(const.FETCH_SHARDING_METADATA_TIMEOUT, space_name, nil) + local err = _fetch_on_router(const.FETCH_SHARDING_METADATA_TIMEOUT, vshard_router, space_name, nil) if err ~= nil then log.warn('Failed to reload sharding cache: %s', err) end diff --git a/crud/count.lua b/crud/count.lua index 332fcbaa..bbab947d 100644 --- a/crud/count.lua +++ b/crud/count.lua @@ -151,7 +151,7 @@ local function call_count_on_router(space_name, user_conditions, opts) -- We don't need sharding info if bucket_id specified. if opts.bucket_id == nil then - sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(space_name) + sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(vshard_router, space_name) if err ~= nil then return nil, err end diff --git a/crud/delete.lua b/crud/delete.lua index 7cee6b09..e2d317fc 100644 --- a/crud/delete.lua +++ b/crud/delete.lua @@ -81,7 +81,7 @@ local function call_delete_on_router(space_name, key, opts) if opts.bucket_id == nil then local primary_index_parts = space.index[0].parts - local sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(space_name) + local sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(vshard_router, space_name) if err ~= nil then return nil, err end diff --git a/crud/get.lua b/crud/get.lua index 0d84e134..0bffce48 100644 --- a/crud/get.lua +++ b/crud/get.lua @@ -84,7 +84,7 @@ local function call_get_on_router(space_name, key, opts) if opts.bucket_id == nil then local primary_index_parts = space.index[0].parts - local sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(space_name) + local sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(vshard_router, space_name) if err ~= nil then return nil, err end diff --git a/crud/select/compat/select.lua b/crud/select/compat/select.lua index e904793b..d16e0823 100644 --- a/crud/select/compat/select.lua +++ b/crud/select/compat/select.lua @@ -61,7 +61,7 @@ local function build_select_iterator(space_name, user_conditions, opts) -- We don't need sharding info if bucket_id specified. if opts.bucket_id == nil then - sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(space_name) + sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(vshard_router, space_name) if err ~= nil then return nil, err end diff --git a/crud/select/compat/select_old.lua b/crud/select/compat/select_old.lua index 928cac7e..2aea31b9 100644 --- a/crud/select/compat/select_old.lua +++ b/crud/select/compat/select_old.lua @@ -123,7 +123,7 @@ local function build_select_iterator(space_name, user_conditions, opts) local sharding_key_as_index_obj = nil -- We don't need sharding info if bucket_id specified. if opts.bucket_id == nil then - local sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(space_name) + local sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(vshard_router, space_name) if err ~= nil then return nil, err end diff --git a/crud/update.lua b/crud/update.lua index 4b1a18be..3ed058cc 100644 --- a/crud/update.lua +++ b/crud/update.lua @@ -109,7 +109,7 @@ local function call_update_on_router(space_name, key, user_operations, opts) if opts.bucket_id == nil then local primary_index_parts = space.index[0].parts - local sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(space_name) + local sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(vshard_router, space_name) if err ~= nil then return nil, err end diff --git a/test/helper.lua b/test/helper.lua index 99a83a13..1f9bebc7 100644 --- a/test/helper.lua +++ b/test/helper.lua @@ -334,9 +334,13 @@ end function helpers.get_sharding_key_cache(cluster) return cluster.main_server.net_box:eval([[ + local vshard = require('vshard') local sharding_metadata_cache = require('crud.common.sharding.router_metadata_cache') - return sharding_metadata_cache[sharding_metadata_cache.SHARDING_KEY_MAP_NAME] + local vshard_router = vshard.router.static + local cache = sharding_metadata_cache.get_instance(vshard_router) + + return cache[sharding_metadata_cache.SHARDING_KEY_MAP_NAME] ]]) end @@ -362,9 +366,13 @@ end -- but not the cache itself function helpers.get_sharding_func_cache_size(cluster) return cluster.main_server.net_box:eval([[ + local vshard = require('vshard') local sharding_metadata_cache = require('crud.common.sharding.router_metadata_cache') - local cache, err = sharding_metadata_cache[sharding_metadata_cache.SHARDING_FUNC_MAP_NAME] + local vshard_router = vshard.router.static + local instance_cache = sharding_metadata_cache.get_instance(vshard_router) + + local cache, err = instance_cache[sharding_metadata_cache.SHARDING_FUNC_MAP_NAME] if cache == nil then return nil, err end diff --git a/test/unit/sharding_metadata_test.lua b/test/unit/sharding_metadata_test.lua index e049c28e..ad321d1e 100644 --- a/test/unit/sharding_metadata_test.lua +++ b/test/unit/sharding_metadata_test.lua @@ -4,7 +4,7 @@ local sharding_metadata_module = require('crud.common.sharding.sharding_metadata local sharding_key_module = require('crud.common.sharding.sharding_key') local sharding_func_module = require('crud.common.sharding.sharding_func') local sharding_utils = require('crud.common.sharding.utils') -local cache = require('crud.common.sharding.router_metadata_cache') +local router_cache = require('crud.common.sharding.router_metadata_cache') local storage_cache = require('crud.common.sharding.storage_metadata_cache') local utils = require('crud.common.utils') @@ -56,7 +56,7 @@ g.after_each(function() end box.space.fetch_on_storage:drop() - cache.drop_caches() + router_cache.drop_caches() storage_cache.drop_caches() end) @@ -298,7 +298,8 @@ g.test_is_part_of_pk_positive = function() } local is_part_of_pk = sharding_key_module.internal.is_part_of_pk - local res = is_part_of_pk(space_name, index_parts, sharding_key_as_index_obj) + local cache = router_cache.get_instance({name = 'dummy'}) + local res = is_part_of_pk(cache, space_name, index_parts, sharding_key_as_index_obj) t.assert_equals(res, true) end @@ -315,7 +316,8 @@ g.test_is_part_of_pk_negative = function() } local is_part_of_pk = sharding_key_module.internal.is_part_of_pk - local res = is_part_of_pk(space_name, index_parts, sharding_key_as_index_obj) + local cache = router_cache.get_instance({name = 'dummy'}) + local res = is_part_of_pk(cache, space_name, index_parts, sharding_key_as_index_obj) t.assert_equals(res, false) end