diff --git a/crud/common/sharding/init.lua b/crud/common/sharding/init.lua index aaf79015..200dd12b 100644 --- a/crud/common/sharding/init.lua +++ b/crud/common/sharding/init.lua @@ -32,7 +32,8 @@ function sharding.key_get_bucket_id(space_name, key, specified_bucket_id) return { bucket_id = specified_bucket_id } end - local sharding_func_data, err = sharding_metadata_module.fetch_sharding_func_on_router(space_name) + local vshard_router = vshard.router.static + local sharding_func_data, err = sharding_metadata_module.fetch_sharding_func_on_router(vshard_router, space_name) if err ~= nil then return nil, err end @@ -53,7 +54,8 @@ function sharding.tuple_get_bucket_id(tuple, space, specified_bucket_id) end local sharding_index_parts = space.index[0].parts - local sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(space.name) + local vshard_router = vshard.router.static + local sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(vshard_router, space.name) if err ~= nil then return nil, err end diff --git a/crud/common/sharding/router_metadata_cache.lua b/crud/common/sharding/router_metadata_cache.lua index a6496f00..8da5fd20 100644 --- a/crud/common/sharding/router_metadata_cache.lua +++ b/crud/common/sharding/router_metadata_cache.lua @@ -5,21 +5,47 @@ local router_metadata_cache = {} router_metadata_cache.SHARDING_KEY_MAP_NAME = "sharding_key_as_index_obj_map" router_metadata_cache.SHARDING_FUNC_MAP_NAME = "sharding_func_map" router_metadata_cache.META_HASH_MAP_NAME = "sharding_meta_hash_map" -router_metadata_cache[router_metadata_cache.SHARDING_KEY_MAP_NAME] = nil -router_metadata_cache[router_metadata_cache.SHARDING_FUNC_MAP_NAME] = nil -router_metadata_cache[router_metadata_cache.META_HASH_MAP_NAME] = {} -router_metadata_cache.fetch_lock = fiber.channel(1) -router_metadata_cache.is_part_of_pk = {} + +local internal_storage = {} + +function router_metadata_cache.get_instance(vshard_router) + local name = vshard_router.name + + if internal_storage[name] ~= nil then + return internal_storage[name] + end + + internal_storage[name] = { + [router_metadata_cache.SHARDING_KEY_MAP_NAME] = nil, + [router_metadata_cache.SHARDING_FUNC_MAP_NAME] = nil, + [router_metadata_cache.META_HASH_MAP_NAME] = {}, + fetch_lock = fiber.channel(1), + is_part_of_pk = {} + } + + return internal_storage[name] +end + +function router_metadata_cache.drop_instance(vshard_router) + local name = vshard_router.name + + if internal_storage[name] == nil then + return + end + + if internal_storage[name].fetch_lock ~= nil then + internal_storage[name].fetch_lock:close() + end + + internal_storage[name] = nil +end function router_metadata_cache.drop_caches() - router_metadata_cache[router_metadata_cache.SHARDING_KEY_MAP_NAME] = nil - router_metadata_cache[router_metadata_cache.SHARDING_FUNC_MAP_NAME] = nil - router_metadata_cache[router_metadata_cache.META_HASH_MAP_NAME] = {} - if router_metadata_cache.fetch_lock ~= nil then - router_metadata_cache.fetch_lock:close() + for name, _ in pairs(internal_storage) do + router_metadata_cache.drop_instance(name) end - router_metadata_cache.fetch_lock = fiber.channel(1) - router_metadata_cache.is_part_of_pk = {} + + internal_storage = {} end return router_metadata_cache \ No newline at end of file diff --git a/crud/common/sharding/sharding_func.lua b/crud/common/sharding/sharding_func.lua index 82a6cf4c..9fc7caca 100644 --- a/crud/common/sharding/sharding_func.lua +++ b/crud/common/sharding/sharding_func.lua @@ -1,8 +1,9 @@ local errors = require('errors') local log = require('log') +local vshard = require('vshard') local dev_checks = require('crud.common.dev_checks') -local cache = require('crud.common.sharding.router_metadata_cache') +local router_cache = require('crud.common.sharding.router_metadata_cache') local utils = require('crud.common.utils') local ShardingFuncError = errors.new_class('ShardingFuncError', {capture_stack = false}) @@ -106,11 +107,13 @@ function sharding_func_module.construct_as_callable_obj_cache(metadata_map, spec local result_err - cache[cache.SHARDING_FUNC_MAP_NAME] = {} - local func_cache = cache[cache.SHARDING_FUNC_MAP_NAME] + local vshard_router = vshard.router.static + local cache = router_cache.get_instance(vshard_router) + cache[router_cache.SHARDING_FUNC_MAP_NAME] = {} + local func_cache = cache[router_cache.SHARDING_FUNC_MAP_NAME] - cache[cache.META_HASH_MAP_NAME][cache.SHARDING_FUNC_MAP_NAME] = {} - local func_hash_cache = cache[cache.META_HASH_MAP_NAME][cache.SHARDING_FUNC_MAP_NAME] + cache[router_cache.META_HASH_MAP_NAME][router_cache.SHARDING_FUNC_MAP_NAME] = {} + local func_hash_cache = cache[router_cache.META_HASH_MAP_NAME][router_cache.SHARDING_FUNC_MAP_NAME] for space_name, metadata in pairs(metadata_map) do if metadata.sharding_func_def ~= nil then diff --git a/crud/common/sharding/sharding_key.lua b/crud/common/sharding/sharding_key.lua index 635c3e34..800c1f3f 100644 --- a/crud/common/sharding/sharding_key.lua +++ b/crud/common/sharding/sharding_key.lua @@ -1,8 +1,9 @@ local errors = require('errors') local log = require('log') +local vshard = require('vshard') local dev_checks = require('crud.common.dev_checks') -local cache = require('crud.common.sharding.router_metadata_cache') +local router_cache = require('crud.common.sharding.router_metadata_cache') local utils = require('crud.common.utils') local ShardingKeyError = errors.new_class("ShardingKeyError", {capture_stack = false}) @@ -30,8 +31,8 @@ local function as_index_object(space_name, space_format, sharding_key_def) end -- Make sure sharding key definition is a part of primary key. -local function is_part_of_pk(space_name, primary_index_parts, sharding_key_as_index_obj) - dev_checks('string', 'table', 'table') +local function is_part_of_pk(cache, space_name, primary_index_parts, sharding_key_as_index_obj) + dev_checks('table', 'string', 'table', 'table') if cache.is_part_of_pk[space_name] ~= nil then return cache.is_part_of_pk[space_name] @@ -83,7 +84,9 @@ function sharding_key_module.extract_from_pk(space_name, sharding_key_as_index_o return primary_key end - local res = is_part_of_pk(space_name, primary_index_parts, sharding_key_as_index_obj) + local vshard_router = vshard.router.static + local cache = router_cache.get_instance(vshard_router) + local res = is_part_of_pk(cache, space_name, primary_index_parts, sharding_key_as_index_obj) if res == false then return nil, ShardingKeyError:new( "Sharding key for space %q is missed in primary index, specify bucket_id", @@ -102,11 +105,13 @@ function sharding_key_module.construct_as_index_obj_cache(metadata_map, specifie local result_err - cache[cache.SHARDING_KEY_MAP_NAME] = {} - local key_cache = cache[cache.SHARDING_KEY_MAP_NAME] + local vshard_router = vshard.router.static + local cache = router_cache.get_instance(vshard_router) + cache[router_cache.SHARDING_KEY_MAP_NAME] = {} + local key_cache = cache[router_cache.SHARDING_KEY_MAP_NAME] - cache[cache.META_HASH_MAP_NAME][cache.SHARDING_KEY_MAP_NAME] = {} - local key_hash_cache = cache[cache.META_HASH_MAP_NAME][cache.SHARDING_KEY_MAP_NAME] + cache[router_cache.META_HASH_MAP_NAME][router_cache.SHARDING_KEY_MAP_NAME] = {} + local key_hash_cache = cache[router_cache.META_HASH_MAP_NAME][router_cache.SHARDING_KEY_MAP_NAME] for space_name, metadata in pairs(metadata_map) do if metadata.sharding_key_def ~= nil then diff --git a/crud/common/sharding/sharding_metadata.lua b/crud/common/sharding/sharding_metadata.lua index 7df5d0d6..3c3e9ff8 100644 --- a/crud/common/sharding/sharding_metadata.lua +++ b/crud/common/sharding/sharding_metadata.lua @@ -1,11 +1,12 @@ local fiber = require('fiber') local errors = require('errors') local log = require('log') +local vshard = require('vshard') local call = require('crud.common.call') local const = require('crud.common.const') local dev_checks = require('crud.common.dev_checks') -local cache = require('crud.common.sharding.router_metadata_cache') +local router_cache = require('crud.common.sharding.router_metadata_cache') local storage_cache = require('crud.common.sharding.storage_metadata_cache') local sharding_func = require('crud.common.sharding.sharding_func') local sharding_key = require('crud.common.sharding.sharding_key') @@ -22,8 +23,11 @@ local sharding_metadata_module = {} local function locked(f) dev_checks('function') - return function(timeout, ...) + return function(timeout, vshard_router, ...) local timeout_deadline = fiber.clock() + timeout + + local cache = router_cache.get_instance(vshard_router) + local ok = cache.fetch_lock:put(true, timeout) -- channel:put() returns false in two cases: when timeout is exceeded -- or channel has been closed. However error message describes only @@ -34,7 +38,7 @@ local function locked(f) "Timeout for fetching sharding metadata is exceeded") end local timeout = timeout_deadline - fiber.clock() - local status, err = pcall(f, timeout, ...) + local status, err = pcall(f, timeout, vshard_router, ...) cache.fetch_lock:get() if not status or err ~= nil then return err @@ -95,8 +99,10 @@ end -- cache.fetch_lock become unlocked during timeout passed to -- _fetch_on_router(). -- metadata_map_name == nil means forced reload. -local _fetch_on_router = locked(function(timeout, space_name, metadata_map_name) - dev_checks('number', 'string', '?string') +local _fetch_on_router = locked(function(timeout, vshard_router, space_name, metadata_map_name) + dev_checks('number', 'table', 'string', '?string') + + local cache = router_cache.get_instance(vshard_router) if (metadata_map_name ~= nil) and (cache[metadata_map_name]) ~= nil then return @@ -109,11 +115,11 @@ local _fetch_on_router = locked(function(timeout, space_name, metadata_map_name) return err end if metadata_map == nil then - cache[cache.SHARDING_KEY_MAP_NAME] = {} - cache[cache.SHARDING_FUNC_MAP_NAME] = {} - cache[cache.META_HASH_MAP_NAME] = { - [cache.SHARDING_KEY_MAP_NAME] = {}, - [cache.SHARDING_FUNC_MAP_NAME] = {}, + cache[router_cache.SHARDING_KEY_MAP_NAME] = {} + cache[router_cache.SHARDING_FUNC_MAP_NAME] = {} + cache[router_cache.META_HASH_MAP_NAME] = { + [router_cache.SHARDING_KEY_MAP_NAME] = {}, + [router_cache.SHARDING_FUNC_MAP_NAME] = {}, } return end @@ -129,16 +135,18 @@ local _fetch_on_router = locked(function(timeout, space_name, metadata_map_name) end end) -local function fetch_on_router(space_name, metadata_map_name, timeout) +local function fetch_on_router(vshard_router, space_name, metadata_map_name, timeout) + local cache = router_cache.get_instance(vshard_router) + if cache[metadata_map_name] ~= nil then return { value = cache[metadata_map_name][space_name], - hash = cache[cache.META_HASH_MAP_NAME][metadata_map_name][space_name] + hash = cache[router_cache.META_HASH_MAP_NAME][metadata_map_name][space_name] } end local timeout = timeout or const.FETCH_SHARDING_METADATA_TIMEOUT - local err = _fetch_on_router(timeout, space_name, metadata_map_name) + local err = _fetch_on_router(timeout, vshard_router, space_name, metadata_map_name) if err ~= nil then return nil, err end @@ -146,7 +154,7 @@ local function fetch_on_router(space_name, metadata_map_name, timeout) if cache[metadata_map_name] ~= nil then return { value = cache[metadata_map_name][space_name], - hash = cache[cache.META_HASH_MAP_NAME][metadata_map_name][space_name], + hash = cache[router_cache.META_HASH_MAP_NAME][metadata_map_name][space_name], } end @@ -163,10 +171,10 @@ end -- that nil without error is a successfull return value. -- - nil and error, when something goes wrong on fetching attempt. -- -function sharding_metadata_module.fetch_sharding_key_on_router(space_name, timeout) - dev_checks('string', '?number') +function sharding_metadata_module.fetch_sharding_key_on_router(vshard_router, space_name, timeout) + dev_checks('table', 'string', '?number') - return fetch_on_router(space_name, cache.SHARDING_KEY_MAP_NAME, timeout) + return fetch_on_router(vshard_router, space_name, router_cache.SHARDING_KEY_MAP_NAME, timeout) end -- Get sharding func for a certain space. @@ -178,28 +186,31 @@ end -- that nil without error is a successfull return value. -- - nil and error, when something goes wrong on fetching attempt. -- -function sharding_metadata_module.fetch_sharding_func_on_router(space_name, timeout) - dev_checks('string', '?number') +function sharding_metadata_module.fetch_sharding_func_on_router(vshard_router, space_name, timeout) + dev_checks('table', 'string', '?number') - return fetch_on_router(space_name, cache.SHARDING_FUNC_MAP_NAME, timeout) + return fetch_on_router(vshard_router, space_name, router_cache.SHARDING_FUNC_MAP_NAME, timeout) end function sharding_metadata_module.update_sharding_key_cache(space_name) - cache.drop_caches() + local vshard_router = vshard.router.static + router_cache.drop_instance(vshard_router) - return sharding_metadata_module.fetch_sharding_key_on_router(space_name) + return sharding_metadata_module.fetch_sharding_key_on_router(vshard_router, space_name) end function sharding_metadata_module.update_sharding_func_cache(space_name) - cache.drop_caches() + local vshard_router = vshard.router.static + router_cache.drop_instance(vshard_router) - return sharding_metadata_module.fetch_sharding_func_on_router(space_name) + return sharding_metadata_module.fetch_sharding_func_on_router(vshard_router, space_name) end function sharding_metadata_module.reload_sharding_cache(space_name) - cache.drop_caches() + local vshard_router = vshard.router.static + router_cache.drop_instance(vshard_router) - local err = _fetch_on_router(const.FETCH_SHARDING_METADATA_TIMEOUT, space_name, nil) + local err = _fetch_on_router(const.FETCH_SHARDING_METADATA_TIMEOUT, vshard_router, space_name, nil) if err ~= nil then log.warn('Failed to reload sharding cache: %s', err) end diff --git a/crud/count.lua b/crud/count.lua index 332fcbaa..bbab947d 100644 --- a/crud/count.lua +++ b/crud/count.lua @@ -151,7 +151,7 @@ local function call_count_on_router(space_name, user_conditions, opts) -- We don't need sharding info if bucket_id specified. if opts.bucket_id == nil then - sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(space_name) + sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(vshard_router, space_name) if err ~= nil then return nil, err end diff --git a/crud/delete.lua b/crud/delete.lua index 7cee6b09..e2d317fc 100644 --- a/crud/delete.lua +++ b/crud/delete.lua @@ -81,7 +81,7 @@ local function call_delete_on_router(space_name, key, opts) if opts.bucket_id == nil then local primary_index_parts = space.index[0].parts - local sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(space_name) + local sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(vshard_router, space_name) if err ~= nil then return nil, err end diff --git a/crud/get.lua b/crud/get.lua index 0d84e134..0bffce48 100644 --- a/crud/get.lua +++ b/crud/get.lua @@ -84,7 +84,7 @@ local function call_get_on_router(space_name, key, opts) if opts.bucket_id == nil then local primary_index_parts = space.index[0].parts - local sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(space_name) + local sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(vshard_router, space_name) if err ~= nil then return nil, err end diff --git a/crud/select/compat/select.lua b/crud/select/compat/select.lua index e904793b..d16e0823 100644 --- a/crud/select/compat/select.lua +++ b/crud/select/compat/select.lua @@ -61,7 +61,7 @@ local function build_select_iterator(space_name, user_conditions, opts) -- We don't need sharding info if bucket_id specified. if opts.bucket_id == nil then - sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(space_name) + sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(vshard_router, space_name) if err ~= nil then return nil, err end diff --git a/crud/select/compat/select_old.lua b/crud/select/compat/select_old.lua index 928cac7e..2aea31b9 100644 --- a/crud/select/compat/select_old.lua +++ b/crud/select/compat/select_old.lua @@ -123,7 +123,7 @@ local function build_select_iterator(space_name, user_conditions, opts) local sharding_key_as_index_obj = nil -- We don't need sharding info if bucket_id specified. if opts.bucket_id == nil then - local sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(space_name) + local sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(vshard_router, space_name) if err ~= nil then return nil, err end diff --git a/crud/update.lua b/crud/update.lua index 4b1a18be..3ed058cc 100644 --- a/crud/update.lua +++ b/crud/update.lua @@ -109,7 +109,7 @@ local function call_update_on_router(space_name, key, user_operations, opts) if opts.bucket_id == nil then local primary_index_parts = space.index[0].parts - local sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(space_name) + local sharding_key_data, err = sharding_metadata_module.fetch_sharding_key_on_router(vshard_router, space_name) if err ~= nil then return nil, err end diff --git a/test/helper.lua b/test/helper.lua index 99a83a13..1f9bebc7 100644 --- a/test/helper.lua +++ b/test/helper.lua @@ -334,9 +334,13 @@ end function helpers.get_sharding_key_cache(cluster) return cluster.main_server.net_box:eval([[ + local vshard = require('vshard') local sharding_metadata_cache = require('crud.common.sharding.router_metadata_cache') - return sharding_metadata_cache[sharding_metadata_cache.SHARDING_KEY_MAP_NAME] + local vshard_router = vshard.router.static + local cache = sharding_metadata_cache.get_instance(vshard_router) + + return cache[sharding_metadata_cache.SHARDING_KEY_MAP_NAME] ]]) end @@ -362,9 +366,13 @@ end -- but not the cache itself function helpers.get_sharding_func_cache_size(cluster) return cluster.main_server.net_box:eval([[ + local vshard = require('vshard') local sharding_metadata_cache = require('crud.common.sharding.router_metadata_cache') - local cache, err = sharding_metadata_cache[sharding_metadata_cache.SHARDING_FUNC_MAP_NAME] + local vshard_router = vshard.router.static + local instance_cache = sharding_metadata_cache.get_instance(vshard_router) + + local cache, err = instance_cache[sharding_metadata_cache.SHARDING_FUNC_MAP_NAME] if cache == nil then return nil, err end diff --git a/test/unit/sharding_metadata_test.lua b/test/unit/sharding_metadata_test.lua index e049c28e..ad321d1e 100644 --- a/test/unit/sharding_metadata_test.lua +++ b/test/unit/sharding_metadata_test.lua @@ -4,7 +4,7 @@ local sharding_metadata_module = require('crud.common.sharding.sharding_metadata local sharding_key_module = require('crud.common.sharding.sharding_key') local sharding_func_module = require('crud.common.sharding.sharding_func') local sharding_utils = require('crud.common.sharding.utils') -local cache = require('crud.common.sharding.router_metadata_cache') +local router_cache = require('crud.common.sharding.router_metadata_cache') local storage_cache = require('crud.common.sharding.storage_metadata_cache') local utils = require('crud.common.utils') @@ -56,7 +56,7 @@ g.after_each(function() end box.space.fetch_on_storage:drop() - cache.drop_caches() + router_cache.drop_caches() storage_cache.drop_caches() end) @@ -298,7 +298,8 @@ g.test_is_part_of_pk_positive = function() } local is_part_of_pk = sharding_key_module.internal.is_part_of_pk - local res = is_part_of_pk(space_name, index_parts, sharding_key_as_index_obj) + local cache = router_cache.get_instance({name = 'dummy'}) + local res = is_part_of_pk(cache, space_name, index_parts, sharding_key_as_index_obj) t.assert_equals(res, true) end @@ -315,7 +316,8 @@ g.test_is_part_of_pk_negative = function() } local is_part_of_pk = sharding_key_module.internal.is_part_of_pk - local res = is_part_of_pk(space_name, index_parts, sharding_key_as_index_obj) + local cache = router_cache.get_instance({name = 'dummy'}) + local res = is_part_of_pk(cache, space_name, index_parts, sharding_key_as_index_obj) t.assert_equals(res, false) end