Skip to content

Commit

Permalink
Separate common sharding metadata methods from sharding key methods
Browse files Browse the repository at this point in the history
It would be more efficient to get sharding keys and
sharding functions from storage in one `fetch_on_storage`
function call. So, this function is common for sharding
keys and sharding functions support. As well as functions
for fetching on router. These methods are introduced
in `sharding_metadata` module. Methods for working
with sharding key structure are introduced in
sharding key module.

Part of #237
  • Loading branch information
AnaNek committed Feb 7, 2022
1 parent 6e0fcc7 commit 891666a
Show file tree
Hide file tree
Showing 14 changed files with 135 additions and 76 deletions.
2 changes: 1 addition & 1 deletion crud/common/const.lua
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@ local const = {}

const.RELOAD_RETRIES_NUM = 1
const.RELOAD_SCHEMA_TIMEOUT = 3 -- 3 seconds
const.FETCH_SHARDING_KEY_TIMEOUT = 3 -- 3 seconds
const.FETCH_SHARDING_METADATA_TIMEOUT = 3 -- 3 seconds

return const
2 changes: 1 addition & 1 deletion crud/common/sharding/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ function sharding.tuple_get_bucket_id(tuple, space, specified_bucket_id)
end

local sharding_index_parts = space.index[0].parts
local sharding_key_as_index_obj, err = sharding_metadata_module.fetch_on_router(space.name)
local sharding_key_as_index_obj, err = sharding_metadata_module.fetch_sharding_key_on_router(space.name)
if err ~= nil then
return nil, err
end
Expand Down
30 changes: 21 additions & 9 deletions crud/common/sharding/sharding_key.lua
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,9 @@ end

-- Extract sharding key from pk.
-- Returns a table with sharding key or pair of nil and error.
function sharding_key_module.extract_from_pk(space_name, primary_index_parts, primary_key, timeout)
dev_checks('string', 'table', '?', '?number')

local sharding_key_as_index_obj, err = require(
'crud.common.sharding.sharding_metadata'
).fetch_on_router(space_name, timeout)
if err ~= nil then
return nil, err
end
function sharding_key_module.extract_from_pk(space_name, sharding_key_as_index_obj, primary_index_parts, primary_key)
dev_checks('string', '?table', 'table', '?')

if sharding_key_as_index_obj == nil then
return primary_key
end
Expand All @@ -102,6 +96,24 @@ function sharding_key_module.extract_from_pk(space_name, primary_index_parts, pr
return extract_from_index(primary_key, primary_index_parts, sharding_key_as_index_obj)
end

function sharding_key_module.construct_as_index_obj_cache(metadata_map)
dev_checks('table')

cache.sharding_key_as_index_obj_map = {}
for space_name, metadata in pairs(metadata_map) do
if metadata.sharding_key_def ~= nil then
local sharding_key_as_index_obj, err = as_index_object(space_name,
metadata.space_format,
metadata.sharding_key_def)
if err ~= nil then
return err
end

cache.sharding_key_as_index_obj_map[space_name] = sharding_key_as_index_obj
end
end
end

sharding_key_module.internal = {
as_index_object = as_index_object,
extract_from_index = extract_from_index,
Expand Down
70 changes: 32 additions & 38 deletions crud/common/sharding/sharding_metadata.lua
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ local call = require('crud.common.call')
local const = require('crud.common.const')
local dev_checks = require('crud.common.dev_checks')
local cache = require('crud.common.sharding.sharding_metadata_cache')
local sharding_key = require('crud.common.sharding.sharding_key')

local FetchShardingKeyError = errors.new_class('FetchShardingKeyError', {capture_stack = false})
local FetchShardingMetadataError = errors.new_class('FetchShardingMetadataError', {capture_stack = false})

local FETCH_FUNC_NAME = '_crud.fetch_on_storage'

local sharding_key_module = {}
local sharding_metadata_module = {}

-- Function decorator that is used to prevent _fetch_on_router() from being
-- called concurrently by different fibers.
Expand All @@ -25,8 +26,8 @@ local function locked(f)
-- first reason, I'm not sure we need to disclose to users such details
-- like problems with synchronization objects.
if not ok then
return FetchShardingKeyError:new(
"Timeout for fetching sharding key is exceeded")
return FetchShardingMetadataError:new(
"Timeout for fetching sharding metadata is exceeded")
end
local timeout = timeout_deadline - fiber.clock()
local status, err = pcall(f, timeout, ...)
Expand All @@ -39,7 +40,7 @@ end

-- Return a map with metadata or nil when space box.space._ddl_sharding_key is
-- not available on storage.
function sharding_key_module.fetch_on_storage()
function sharding_metadata_module.fetch_on_storage()
local sharding_key_space = box.space._ddl_sharding_key
if sharding_key_space == nil then
return nil
Expand Down Expand Up @@ -67,10 +68,10 @@ end
-- a sharding metadata by a single one, other fibers will wait while
-- cache.fetch_lock become unlocked during timeout passed to
-- _fetch_on_router().
local _fetch_on_router = locked(function(timeout)
dev_checks('number')
local _fetch_on_router = locked(function(timeout, space_name, metadata_map_name)
dev_checks('number', 'string', 'string')

if cache.sharding_key_as_index_obj_map ~= nil then
if cache[metadata_map_name] ~= nil then
return
end

Expand All @@ -81,21 +82,13 @@ local _fetch_on_router = locked(function(timeout)
return err
end
if metadata_map == nil then
cache.sharding_key_as_index_obj_map = {}
cache[cache.SHARDING_KEY_MAP_NAME] = {}
return
end

cache.sharding_key_as_index_obj_map = {}
for space_name, metadata in pairs(metadata_map) do
local sharding_key_as_index_obj, err = require(
'crud.common.sharding.sharding_key'
).internal.as_index_object(space_name,
metadata.space_format,
metadata.sharding_key_def)
if err ~= nil then
return err
end
cache.sharding_key_as_index_obj_map[space_name] = sharding_key_as_index_obj
local err = sharding_key.construct_as_index_obj_cache(metadata_map, space_name)
if err ~= nil then
return err
end
end)

Expand All @@ -108,37 +101,38 @@ end)
-- that nil without error is a successfull return value.
-- - nil and error, when something goes wrong on fetching attempt.
--
function sharding_key_module.fetch_on_router(space_name, timeout)
dev_checks('string', '?number')

if cache.sharding_key_as_index_obj_map ~= nil then
return cache.sharding_key_as_index_obj_map[space_name]
local function fetch_on_router(space_name, metadata_map_name, timeout)
if cache[metadata_map_name] ~= nil then
return cache[metadata_map_name][space_name]
end

local timeout = timeout or const.FETCH_SHARDING_KEY_TIMEOUT
local err = _fetch_on_router(timeout)
local timeout = timeout or const.FETCH_SHARDING_METADATA_TIMEOUT
local err = _fetch_on_router(timeout, space_name, metadata_map_name)
if err ~= nil then
if cache.sharding_key_as_index_obj_map ~= nil then
return cache.sharding_key_as_index_obj_map[space_name]
end
return nil, err
end

if cache.sharding_key_as_index_obj_map ~= nil then
return cache.sharding_key_as_index_obj_map[space_name]
if cache[metadata_map_name] ~= nil then
return cache[metadata_map_name][space_name]
end

return nil, FetchShardingKeyError:new(
return nil, FetchShardingMetadataError:new(
"Fetching sharding key for space '%s' is failed", space_name)
end

function sharding_key_module.update_cache(space_name)
function sharding_metadata_module.fetch_sharding_key_on_router(space_name, timeout)
dev_checks('string', '?number')

return fetch_on_router(space_name, cache.SHARDING_KEY_MAP_NAME, timeout)
end

function sharding_metadata_module.update_sharding_key_cache(space_name)
cache.drop_caches()
return sharding_key_module.fetch_on_router(space_name)
return sharding_metadata_module.fetch_sharding_key_on_router(space_name)
end

function sharding_key_module.init()
_G._crud.fetch_on_storage = sharding_key_module.fetch_on_storage
function sharding_metadata_module.init()
_G._crud.fetch_on_storage = sharding_metadata_module.fetch_on_storage
end

return sharding_key_module
return sharding_metadata_module
23 changes: 12 additions & 11 deletions crud/common/sharding/sharding_metadata_cache.lua
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
local fiber = require('fiber')

local sharding_key_cache = {}
local sharding_metadata_cache = {}

sharding_key_cache.sharding_key_as_index_obj_map = nil
sharding_key_cache.fetch_lock = fiber.channel(1)
sharding_key_cache.is_part_of_pk = {}
sharding_metadata_cache.SHARDING_KEY_MAP_NAME = "sharding_key_as_index_obj_map"
sharding_metadata_cache[sharding_metadata_cache.SHARDING_KEY_MAP_NAME] = nil
sharding_metadata_cache.fetch_lock = fiber.channel(1)
sharding_metadata_cache.is_part_of_pk = {}

function sharding_key_cache.drop_caches()
sharding_key_cache.sharding_key_as_index_obj_map = nil
if sharding_key_cache.fetch_lock ~= nil then
sharding_key_cache.fetch_lock:close()
function sharding_metadata_cache.drop_caches()
sharding_metadata_cache[sharding_metadata_cache.SHARDING_KEY_MAP_NAME] = nil
if sharding_metadata_cache.fetch_lock ~= nil then
sharding_metadata_cache.fetch_lock:close()
end
sharding_key_cache.fetch_lock = fiber.channel(1)
sharding_key_cache.is_part_of_pk = {}
sharding_metadata_cache.fetch_lock = fiber.channel(1)
sharding_metadata_cache.is_part_of_pk = {}
end

return sharding_key_cache
return sharding_metadata_cache
9 changes: 9 additions & 0 deletions crud/common/sharding_key.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
local sharding_metadata_module = require('crud.common.sharding.sharding_metadata')

local sharding_key_cache = {}

function sharding_key_cache.update_cache(space_name)
return sharding_metadata_module.update_sharding_key_cache(space_name)
end

return sharding_key_cache
12 changes: 10 additions & 2 deletions crud/delete.lua
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ local call = require('crud.common.call')
local utils = require('crud.common.utils')
local sharding = require('crud.common.sharding')
local sharding_key_module = require('crud.common.sharding.sharding_key')
local sharding_metadata_module = require('crud.common.sharding.sharding_metadata')
local dev_checks = require('crud.common.dev_checks')
local schema = require('crud.common.schema')

Expand Down Expand Up @@ -58,9 +59,16 @@ local function call_delete_on_router(space_name, key, opts)

local sharding_key = key
if opts.bucket_id == nil then
local err
local primary_index_parts = space.index[0].parts
sharding_key, err = sharding_key_module.extract_from_pk(space_name, primary_index_parts, key, opts.timeout)

local sharding_key_as_index_obj, err = sharding_metadata_module.fetch_sharding_key_on_router(space_name)
if err ~= nil then
return nil, err
end

sharding_key, err = sharding_key_module.extract_from_pk(space_name,
sharding_key_as_index_obj,
primary_index_parts, key)
if err ~= nil then
return nil, err
end
Expand Down
12 changes: 10 additions & 2 deletions crud/get.lua
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ local call = require('crud.common.call')
local utils = require('crud.common.utils')
local sharding = require('crud.common.sharding')
local sharding_key_module = require('crud.common.sharding.sharding_key')
local sharding_metadata_module = require('crud.common.sharding.sharding_metadata')
local dev_checks = require('crud.common.dev_checks')
local schema = require('crud.common.schema')

Expand Down Expand Up @@ -61,9 +62,16 @@ local function call_get_on_router(space_name, key, opts)

local sharding_key = key
if opts.bucket_id == nil then
local err
local primary_index_parts = space.index[0].parts
sharding_key, err = sharding_key_module.extract_from_pk(space_name, primary_index_parts, key, opts.timeout)

local sharding_key_as_index_obj, err = sharding_metadata_module.fetch_sharding_key_on_router(space_name)
if err ~= nil then
return nil, err
end

sharding_key, err = sharding_key_module.extract_from_pk(space_name,
sharding_key_as_index_obj,
primary_index_parts, key)
if err ~= nil then
return nil, err
end
Expand Down
2 changes: 1 addition & 1 deletion crud/select/compat/select.lua
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ local function build_select_iterator(space_name, user_conditions, opts)
return nil, SelectError:new("Space %q doesn't exist", space_name), true
end
local space_format = space:format()
local sharding_key_as_index_obj, err = sharding_metadata_module.fetch_on_router(space_name)
local sharding_key_as_index_obj, err = sharding_metadata_module.fetch_sharding_key_on_router(space_name)
if err ~= nil then
return nil, err
end
Expand Down
2 changes: 1 addition & 1 deletion crud/select/compat/select_old.lua
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ local function build_select_iterator(space_name, user_conditions, opts)
return nil, SelectError:new("Space %q doesn't exist", space_name), true
end
local space_format = space:format()
local sharding_key_as_index_obj, err = sharding_metadata_module.fetch_on_router(space_name)
local sharding_key_as_index_obj, err = sharding_metadata_module.fetch_sharding_key_on_router(space_name)
if err ~= nil then
return nil, err
end
Expand Down
12 changes: 10 additions & 2 deletions crud/update.lua
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ local call = require('crud.common.call')
local utils = require('crud.common.utils')
local sharding = require('crud.common.sharding')
local sharding_key_module = require('crud.common.sharding.sharding_key')
local sharding_metadata_module = require('crud.common.sharding.sharding_metadata')
local dev_checks = require('crud.common.dev_checks')
local schema = require('crud.common.schema')

Expand Down Expand Up @@ -86,9 +87,16 @@ local function call_update_on_router(space_name, key, user_operations, opts)

local sharding_key = key
if opts.bucket_id == nil then
local err
local primary_index_parts = space.index[0].parts
sharding_key, err = sharding_key_module.extract_from_pk(space_name, primary_index_parts, key, opts.timeout)

local sharding_key_as_index_obj, err = sharding_metadata_module.fetch_sharding_key_on_router(space_name)
if err ~= nil then
return nil, err
end

sharding_key, err = sharding_key_module.extract_from_pk(space_name,
sharding_key_as_index_obj,
primary_index_parts, key)
if err ~= nil then
return nil, err
end
Expand Down
6 changes: 3 additions & 3 deletions test/helper.lua
Original file line number Diff line number Diff line change
Expand Up @@ -323,12 +323,12 @@ function helpers.tarantool_version_at_least(wanted_major, wanted_minor,
return true
end

function helpers.update_cache(cluster, space_name)
function helpers.update_sharding_key_cache(cluster, space_name)
return cluster.main_server.net_box:eval([[
local sharding_metadata = require('crud.common.sharding.sharding_metadata')
local sharding_key = require('crud.common.sharding_key')
local space_name = ...
return sharding_metadata.update_cache(space_name)
return sharding_key.update_cache(space_name)
]], {space_name})
end

Expand Down
9 changes: 6 additions & 3 deletions test/integration/ddl_sharding_key_test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -657,19 +657,22 @@ end

pgroup.test_update_cache = function(g)
local space_name = 'customers_name_key'
local sharding_key_as_index_obj = helpers.update_cache(g.cluster, space_name)
local sharding_key_as_index_obj, err = helpers.update_sharding_key_cache(g.cluster, space_name)
t.assert_equals(err, nil)
t.assert_equals(sharding_key_as_index_obj, {parts = {{fieldno = 3}}})

helpers.call_on_servers(g.cluster, {'s1-master', 's2-master'}, function(server)
server.net_box:call('set_sharding_key', {space_name, {'age'}})
end)
sharding_key_as_index_obj = helpers.update_cache(g.cluster, space_name)
sharding_key_as_index_obj, err = helpers.update_sharding_key_cache(g.cluster, space_name)
t.assert_equals(err, nil)
t.assert_equals(sharding_key_as_index_obj, {parts = {{fieldno = 4}}})

-- Recover sharding key.
helpers.call_on_servers(g.cluster, {'s1-master', 's2-master'}, function(server)
server.net_box:call('set_sharding_key', {space_name, {'name'}})
end)
sharding_key_as_index_obj = helpers.update_cache(g.cluster, space_name)
sharding_key_as_index_obj, err = helpers.update_sharding_key_cache(g.cluster, space_name)
t.assert_equals(err, nil)
t.assert_equals(sharding_key_as_index_obj, {parts = {{fieldno = 3}}})
end
Loading

0 comments on commit 891666a

Please sign in to comment.