From 891666a84b12c0d4d7a2cc3911e24ea9e6f51429 Mon Sep 17 00:00:00 2001 From: AnaNek Date: Wed, 22 Dec 2021 12:59:18 +0300 Subject: [PATCH] Separate common sharding metadata methods from sharding key methods It would be more efficient to get sharding keys and sharding functions from storage in one `fetch_on_storage` function call. So, this function is common for sharding keys and sharding functions support. As well as functions for fetching on router. These methods are introduced in `sharding_metadata` module. Methods for working with sharding key structure are introduced in sharding key module. Part of #237 --- crud/common/const.lua | 2 +- crud/common/sharding/init.lua | 2 +- crud/common/sharding/sharding_key.lua | 30 +++++--- crud/common/sharding/sharding_metadata.lua | 70 +++++++++---------- .../sharding/sharding_metadata_cache.lua | 23 +++--- crud/common/sharding_key.lua | 9 +++ crud/delete.lua | 12 +++- crud/get.lua | 12 +++- crud/select/compat/select.lua | 2 +- crud/select/compat/select_old.lua | 2 +- crud/update.lua | 12 +++- test/helper.lua | 6 +- test/integration/ddl_sharding_key_test.lua | 9 ++- test/unit/sharding_metadata_test.lua | 20 +++++- 14 files changed, 135 insertions(+), 76 deletions(-) create mode 100644 crud/common/sharding_key.lua diff --git a/crud/common/const.lua b/crud/common/const.lua index e546dddc5..fc261269e 100644 --- a/crud/common/const.lua +++ b/crud/common/const.lua @@ -2,6 +2,6 @@ local const = {} const.RELOAD_RETRIES_NUM = 1 const.RELOAD_SCHEMA_TIMEOUT = 3 -- 3 seconds -const.FETCH_SHARDING_KEY_TIMEOUT = 3 -- 3 seconds +const.FETCH_SHARDING_METADATA_TIMEOUT = 3 -- 3 seconds return const diff --git a/crud/common/sharding/init.lua b/crud/common/sharding/init.lua index a580e2dec..b47c5e88e 100644 --- a/crud/common/sharding/init.lua +++ b/crud/common/sharding/init.lua @@ -22,7 +22,7 @@ function sharding.tuple_get_bucket_id(tuple, space, specified_bucket_id) end local sharding_index_parts = space.index[0].parts - local sharding_key_as_index_obj, err = sharding_metadata_module.fetch_on_router(space.name) + local sharding_key_as_index_obj, err = sharding_metadata_module.fetch_sharding_key_on_router(space.name) if err ~= nil then return nil, err end diff --git a/crud/common/sharding/sharding_key.lua b/crud/common/sharding/sharding_key.lua index 9a9134cae..dc5af1a83 100644 --- a/crud/common/sharding/sharding_key.lua +++ b/crud/common/sharding/sharding_key.lua @@ -75,15 +75,9 @@ end -- Extract sharding key from pk. -- Returns a table with sharding key or pair of nil and error. -function sharding_key_module.extract_from_pk(space_name, primary_index_parts, primary_key, timeout) - dev_checks('string', 'table', '?', '?number') - - local sharding_key_as_index_obj, err = require( - 'crud.common.sharding.sharding_metadata' - ).fetch_on_router(space_name, timeout) - if err ~= nil then - return nil, err - end +function sharding_key_module.extract_from_pk(space_name, sharding_key_as_index_obj, primary_index_parts, primary_key) + dev_checks('string', '?table', 'table', '?') + if sharding_key_as_index_obj == nil then return primary_key end @@ -102,6 +96,24 @@ function sharding_key_module.extract_from_pk(space_name, primary_index_parts, pr return extract_from_index(primary_key, primary_index_parts, sharding_key_as_index_obj) end +function sharding_key_module.construct_as_index_obj_cache(metadata_map) + dev_checks('table') + + cache.sharding_key_as_index_obj_map = {} + for space_name, metadata in pairs(metadata_map) do + if metadata.sharding_key_def ~= nil then + local sharding_key_as_index_obj, err = as_index_object(space_name, + metadata.space_format, + metadata.sharding_key_def) + if err ~= nil then + return err + end + + cache.sharding_key_as_index_obj_map[space_name] = sharding_key_as_index_obj + end + end +end + sharding_key_module.internal = { as_index_object = as_index_object, extract_from_index = extract_from_index, diff --git a/crud/common/sharding/sharding_metadata.lua b/crud/common/sharding/sharding_metadata.lua index f19b06e94..86dc0a960 100644 --- a/crud/common/sharding/sharding_metadata.lua +++ b/crud/common/sharding/sharding_metadata.lua @@ -5,12 +5,13 @@ local call = require('crud.common.call') local const = require('crud.common.const') local dev_checks = require('crud.common.dev_checks') local cache = require('crud.common.sharding.sharding_metadata_cache') +local sharding_key = require('crud.common.sharding.sharding_key') -local FetchShardingKeyError = errors.new_class('FetchShardingKeyError', {capture_stack = false}) +local FetchShardingMetadataError = errors.new_class('FetchShardingMetadataError', {capture_stack = false}) local FETCH_FUNC_NAME = '_crud.fetch_on_storage' -local sharding_key_module = {} +local sharding_metadata_module = {} -- Function decorator that is used to prevent _fetch_on_router() from being -- called concurrently by different fibers. @@ -25,8 +26,8 @@ local function locked(f) -- first reason, I'm not sure we need to disclose to users such details -- like problems with synchronization objects. if not ok then - return FetchShardingKeyError:new( - "Timeout for fetching sharding key is exceeded") + return FetchShardingMetadataError:new( + "Timeout for fetching sharding metadata is exceeded") end local timeout = timeout_deadline - fiber.clock() local status, err = pcall(f, timeout, ...) @@ -39,7 +40,7 @@ end -- Return a map with metadata or nil when space box.space._ddl_sharding_key is -- not available on storage. -function sharding_key_module.fetch_on_storage() +function sharding_metadata_module.fetch_on_storage() local sharding_key_space = box.space._ddl_sharding_key if sharding_key_space == nil then return nil @@ -67,10 +68,10 @@ end -- a sharding metadata by a single one, other fibers will wait while -- cache.fetch_lock become unlocked during timeout passed to -- _fetch_on_router(). -local _fetch_on_router = locked(function(timeout) - dev_checks('number') +local _fetch_on_router = locked(function(timeout, space_name, metadata_map_name) + dev_checks('number', 'string', 'string') - if cache.sharding_key_as_index_obj_map ~= nil then + if cache[metadata_map_name] ~= nil then return end @@ -81,21 +82,13 @@ local _fetch_on_router = locked(function(timeout) return err end if metadata_map == nil then - cache.sharding_key_as_index_obj_map = {} + cache[cache.SHARDING_KEY_MAP_NAME] = {} return end - cache.sharding_key_as_index_obj_map = {} - for space_name, metadata in pairs(metadata_map) do - local sharding_key_as_index_obj, err = require( - 'crud.common.sharding.sharding_key' - ).internal.as_index_object(space_name, - metadata.space_format, - metadata.sharding_key_def) - if err ~= nil then - return err - end - cache.sharding_key_as_index_obj_map[space_name] = sharding_key_as_index_obj + local err = sharding_key.construct_as_index_obj_cache(metadata_map, space_name) + if err ~= nil then + return err end end) @@ -108,37 +101,38 @@ end) -- that nil without error is a successfull return value. -- - nil and error, when something goes wrong on fetching attempt. -- -function sharding_key_module.fetch_on_router(space_name, timeout) - dev_checks('string', '?number') - - if cache.sharding_key_as_index_obj_map ~= nil then - return cache.sharding_key_as_index_obj_map[space_name] +local function fetch_on_router(space_name, metadata_map_name, timeout) + if cache[metadata_map_name] ~= nil then + return cache[metadata_map_name][space_name] end - local timeout = timeout or const.FETCH_SHARDING_KEY_TIMEOUT - local err = _fetch_on_router(timeout) + local timeout = timeout or const.FETCH_SHARDING_METADATA_TIMEOUT + local err = _fetch_on_router(timeout, space_name, metadata_map_name) if err ~= nil then - if cache.sharding_key_as_index_obj_map ~= nil then - return cache.sharding_key_as_index_obj_map[space_name] - end return nil, err end - if cache.sharding_key_as_index_obj_map ~= nil then - return cache.sharding_key_as_index_obj_map[space_name] + if cache[metadata_map_name] ~= nil then + return cache[metadata_map_name][space_name] end - return nil, FetchShardingKeyError:new( + return nil, FetchShardingMetadataError:new( "Fetching sharding key for space '%s' is failed", space_name) end -function sharding_key_module.update_cache(space_name) +function sharding_metadata_module.fetch_sharding_key_on_router(space_name, timeout) + dev_checks('string', '?number') + + return fetch_on_router(space_name, cache.SHARDING_KEY_MAP_NAME, timeout) +end + +function sharding_metadata_module.update_sharding_key_cache(space_name) cache.drop_caches() - return sharding_key_module.fetch_on_router(space_name) + return sharding_metadata_module.fetch_sharding_key_on_router(space_name) end -function sharding_key_module.init() - _G._crud.fetch_on_storage = sharding_key_module.fetch_on_storage +function sharding_metadata_module.init() + _G._crud.fetch_on_storage = sharding_metadata_module.fetch_on_storage end -return sharding_key_module +return sharding_metadata_module diff --git a/crud/common/sharding/sharding_metadata_cache.lua b/crud/common/sharding/sharding_metadata_cache.lua index a1ab39652..77c775940 100644 --- a/crud/common/sharding/sharding_metadata_cache.lua +++ b/crud/common/sharding/sharding_metadata_cache.lua @@ -1,18 +1,19 @@ local fiber = require('fiber') -local sharding_key_cache = {} +local sharding_metadata_cache = {} -sharding_key_cache.sharding_key_as_index_obj_map = nil -sharding_key_cache.fetch_lock = fiber.channel(1) -sharding_key_cache.is_part_of_pk = {} +sharding_metadata_cache.SHARDING_KEY_MAP_NAME = "sharding_key_as_index_obj_map" +sharding_metadata_cache[sharding_metadata_cache.SHARDING_KEY_MAP_NAME] = nil +sharding_metadata_cache.fetch_lock = fiber.channel(1) +sharding_metadata_cache.is_part_of_pk = {} -function sharding_key_cache.drop_caches() - sharding_key_cache.sharding_key_as_index_obj_map = nil - if sharding_key_cache.fetch_lock ~= nil then - sharding_key_cache.fetch_lock:close() +function sharding_metadata_cache.drop_caches() + sharding_metadata_cache[sharding_metadata_cache.SHARDING_KEY_MAP_NAME] = nil + if sharding_metadata_cache.fetch_lock ~= nil then + sharding_metadata_cache.fetch_lock:close() end - sharding_key_cache.fetch_lock = fiber.channel(1) - sharding_key_cache.is_part_of_pk = {} + sharding_metadata_cache.fetch_lock = fiber.channel(1) + sharding_metadata_cache.is_part_of_pk = {} end -return sharding_key_cache +return sharding_metadata_cache diff --git a/crud/common/sharding_key.lua b/crud/common/sharding_key.lua new file mode 100644 index 000000000..eba2133f3 --- /dev/null +++ b/crud/common/sharding_key.lua @@ -0,0 +1,9 @@ +local sharding_metadata_module = require('crud.common.sharding.sharding_metadata') + +local sharding_key_cache = {} + +function sharding_key_cache.update_cache(space_name) + return sharding_metadata_module.update_sharding_key_cache(space_name) +end + +return sharding_key_cache diff --git a/crud/delete.lua b/crud/delete.lua index 623682fa1..5fad9ea1e 100644 --- a/crud/delete.lua +++ b/crud/delete.lua @@ -6,6 +6,7 @@ local call = require('crud.common.call') local utils = require('crud.common.utils') local sharding = require('crud.common.sharding') local sharding_key_module = require('crud.common.sharding.sharding_key') +local sharding_metadata_module = require('crud.common.sharding.sharding_metadata') local dev_checks = require('crud.common.dev_checks') local schema = require('crud.common.schema') @@ -58,9 +59,16 @@ local function call_delete_on_router(space_name, key, opts) local sharding_key = key if opts.bucket_id == nil then - local err local primary_index_parts = space.index[0].parts - sharding_key, err = sharding_key_module.extract_from_pk(space_name, primary_index_parts, key, opts.timeout) + + local sharding_key_as_index_obj, err = sharding_metadata_module.fetch_sharding_key_on_router(space_name) + if err ~= nil then + return nil, err + end + + sharding_key, err = sharding_key_module.extract_from_pk(space_name, + sharding_key_as_index_obj, + primary_index_parts, key) if err ~= nil then return nil, err end diff --git a/crud/get.lua b/crud/get.lua index 9d0f7b376..e28538829 100644 --- a/crud/get.lua +++ b/crud/get.lua @@ -6,6 +6,7 @@ local call = require('crud.common.call') local utils = require('crud.common.utils') local sharding = require('crud.common.sharding') local sharding_key_module = require('crud.common.sharding.sharding_key') +local sharding_metadata_module = require('crud.common.sharding.sharding_metadata') local dev_checks = require('crud.common.dev_checks') local schema = require('crud.common.schema') @@ -61,9 +62,16 @@ local function call_get_on_router(space_name, key, opts) local sharding_key = key if opts.bucket_id == nil then - local err local primary_index_parts = space.index[0].parts - sharding_key, err = sharding_key_module.extract_from_pk(space_name, primary_index_parts, key, opts.timeout) + + local sharding_key_as_index_obj, err = sharding_metadata_module.fetch_sharding_key_on_router(space_name) + if err ~= nil then + return nil, err + end + + sharding_key, err = sharding_key_module.extract_from_pk(space_name, + sharding_key_as_index_obj, + primary_index_parts, key) if err ~= nil then return nil, err end diff --git a/crud/select/compat/select.lua b/crud/select/compat/select.lua index 0c1e62b25..29a7ac554 100644 --- a/crud/select/compat/select.lua +++ b/crud/select/compat/select.lua @@ -51,7 +51,7 @@ local function build_select_iterator(space_name, user_conditions, opts) return nil, SelectError:new("Space %q doesn't exist", space_name), true end local space_format = space:format() - local sharding_key_as_index_obj, err = sharding_metadata_module.fetch_on_router(space_name) + local sharding_key_as_index_obj, err = sharding_metadata_module.fetch_sharding_key_on_router(space_name) if err ~= nil then return nil, err end diff --git a/crud/select/compat/select_old.lua b/crud/select/compat/select_old.lua index 05018dd8f..d696e21e7 100644 --- a/crud/select/compat/select_old.lua +++ b/crud/select/compat/select_old.lua @@ -103,7 +103,7 @@ local function build_select_iterator(space_name, user_conditions, opts) return nil, SelectError:new("Space %q doesn't exist", space_name), true end local space_format = space:format() - local sharding_key_as_index_obj, err = sharding_metadata_module.fetch_on_router(space_name) + local sharding_key_as_index_obj, err = sharding_metadata_module.fetch_sharding_key_on_router(space_name) if err ~= nil then return nil, err end diff --git a/crud/update.lua b/crud/update.lua index 0708233cd..124877079 100644 --- a/crud/update.lua +++ b/crud/update.lua @@ -6,6 +6,7 @@ local call = require('crud.common.call') local utils = require('crud.common.utils') local sharding = require('crud.common.sharding') local sharding_key_module = require('crud.common.sharding.sharding_key') +local sharding_metadata_module = require('crud.common.sharding.sharding_metadata') local dev_checks = require('crud.common.dev_checks') local schema = require('crud.common.schema') @@ -86,9 +87,16 @@ local function call_update_on_router(space_name, key, user_operations, opts) local sharding_key = key if opts.bucket_id == nil then - local err local primary_index_parts = space.index[0].parts - sharding_key, err = sharding_key_module.extract_from_pk(space_name, primary_index_parts, key, opts.timeout) + + local sharding_key_as_index_obj, err = sharding_metadata_module.fetch_sharding_key_on_router(space_name) + if err ~= nil then + return nil, err + end + + sharding_key, err = sharding_key_module.extract_from_pk(space_name, + sharding_key_as_index_obj, + primary_index_parts, key) if err ~= nil then return nil, err end diff --git a/test/helper.lua b/test/helper.lua index 6661a37a3..f46bf7017 100644 --- a/test/helper.lua +++ b/test/helper.lua @@ -323,12 +323,12 @@ function helpers.tarantool_version_at_least(wanted_major, wanted_minor, return true end -function helpers.update_cache(cluster, space_name) +function helpers.update_sharding_key_cache(cluster, space_name) return cluster.main_server.net_box:eval([[ - local sharding_metadata = require('crud.common.sharding.sharding_metadata') + local sharding_key = require('crud.common.sharding_key') local space_name = ... - return sharding_metadata.update_cache(space_name) + return sharding_key.update_cache(space_name) ]], {space_name}) end diff --git a/test/integration/ddl_sharding_key_test.lua b/test/integration/ddl_sharding_key_test.lua index 1fa3bdfe5..30ca432fe 100644 --- a/test/integration/ddl_sharding_key_test.lua +++ b/test/integration/ddl_sharding_key_test.lua @@ -657,19 +657,22 @@ end pgroup.test_update_cache = function(g) local space_name = 'customers_name_key' - local sharding_key_as_index_obj = helpers.update_cache(g.cluster, space_name) + local sharding_key_as_index_obj, err = helpers.update_sharding_key_cache(g.cluster, space_name) + t.assert_equals(err, nil) t.assert_equals(sharding_key_as_index_obj, {parts = {{fieldno = 3}}}) helpers.call_on_servers(g.cluster, {'s1-master', 's2-master'}, function(server) server.net_box:call('set_sharding_key', {space_name, {'age'}}) end) - sharding_key_as_index_obj = helpers.update_cache(g.cluster, space_name) + sharding_key_as_index_obj, err = helpers.update_sharding_key_cache(g.cluster, space_name) + t.assert_equals(err, nil) t.assert_equals(sharding_key_as_index_obj, {parts = {{fieldno = 4}}}) -- Recover sharding key. helpers.call_on_servers(g.cluster, {'s1-master', 's2-master'}, function(server) server.net_box:call('set_sharding_key', {space_name, {'name'}}) end) - sharding_key_as_index_obj = helpers.update_cache(g.cluster, space_name) + sharding_key_as_index_obj, err = helpers.update_sharding_key_cache(g.cluster, space_name) + t.assert_equals(err, nil) t.assert_equals(sharding_key_as_index_obj, {parts = {{fieldno = 3}}}) end diff --git a/test/unit/sharding_metadata_test.lua b/test/unit/sharding_metadata_test.lua index 1fa4ed805..dad79e7fb 100644 --- a/test/unit/sharding_metadata_test.lua +++ b/test/unit/sharding_metadata_test.lua @@ -83,7 +83,23 @@ g.test_get_format_fieldno_map = function() t.assert_equals(fieldno_map, {age = 3, id = 1, name = 2}) end -g.test_fetch_on_storage_positive = function() +g.test_fetch_sharding_metadata_on_storage_positive = function() + local space_name = 'fetch_on_storage' + local sharding_key_def = {'name', 'age'} + + box.space._ddl_sharding_key:insert({space_name, sharding_key_def}) + + local metadata_map = sharding_metadata_module.fetch_on_storage() + + t.assert_equals(metadata_map, { + [space_name] = { + sharding_key_def = sharding_key_def, + space_format = {} + }, + }) +end + +g.test_fetch_sharding_key_on_storage_positive = function() local space_name = 'fetch_on_storage' local sharding_key_def = {'name', 'age'} box.space._ddl_sharding_key:insert({space_name, sharding_key_def}) @@ -98,7 +114,7 @@ g.test_fetch_on_storage_positive = function() }) end -g.test_fetch_on_storage_negative = function() +g.test_fetch_sharding_metadata_on_storage_negative = function() -- Test checks return value when _ddl_sharding_key is absent. box.space._ddl_sharding_key:drop()