Skip to content

Commit

Permalink
Add support of custom sharding func for crud methods
Browse files Browse the repository at this point in the history
This commit introduces modifications in functions
for fetching sharding metadata on storage and router
to get sharding function. Function `sharding.key_get_bucket_id`
calculates bucket_id using DDL sharding function if
sharding function exist for specified space. Description in
documentation, integration and unit tests are added
as well.

Closes #237
  • Loading branch information
AnaNek committed Jan 28, 2022
1 parent a4ea142 commit c63b589
Show file tree
Hide file tree
Showing 17 changed files with 795 additions and 30 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
key specified with DDL schema or in `_ddl_sharding_key` space.
NOTE: CRUD methods delete(), get() and update() requires that sharding key
must be a part of primary key.
* Support bucket id calculating using sharding func specified in
DDL schema or in `_ddl_sharding_func` space.

### Fixed

Expand Down
19 changes: 19 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,25 @@ documentation). As soon as sharding key for a certain space is available in
automatically. Note that CRUD methods `delete()`, `get()` and `update()`
requires that sharding key must be a part of primary key.

You can specify sharding function to calculate bucket_id with
sharding func definition as a part of [DDL
schema](https://github.com/tarantool/ddl#input-data-format)
or insert manually to the space `_ddl_sharding_func`.

CRUD uses `strcrc32` as sharding function by default.
The reason why using of `strcrc32` is undesirable is that
this sharding function is not consistent for cdata numbers.
In particular, it returns 3 different values for normal Lua
numbers like 123, for `unsigned long long` cdata
(like `123ULL`, or `ffi.cast('unsigned long long',
123)`), and for `signed long long` cdata (like `123LL`, or
`ffi.cast('long long', 123)`).

We cannot change default sharding function `strcrc32`
due to backward compatibility concerns, but please consider
using better alternatives for sharding function.
`mpcrc32` is one of them.

Table below describe what operations supports custom sharding key:

| CRUD method | Sharding key support |
Expand Down
16 changes: 14 additions & 2 deletions crud/common/sharding/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,27 @@ local errors = require('errors')
local BucketIDError = errors.new_class("BucketIDError", {capture_stack = false})

local utils = require('crud.common.utils')
local dev_checks = require('crud.common.dev_checks')
local sharding_metadata_module = require('crud.common.sharding.sharding_metadata')

local sharding = {}

function sharding.key_get_bucket_id(key, specified_bucket_id)
function sharding.key_get_bucket_id(space_name, key, specified_bucket_id)
dev_checks('string', '?', '?number|cdata')

if specified_bucket_id ~= nil then
return specified_bucket_id
end

local sharding_func, err = sharding_metadata_module.fetch_sharding_func_on_router(space_name)
if err ~= nil then
return nil, err
end

if sharding_func ~= nil then
return sharding_func(key)
end

return vshard.router.bucket_id_strcrc32(key)
end

Expand All @@ -31,7 +43,7 @@ function sharding.tuple_get_bucket_id(tuple, space, specified_bucket_id)
end
local key = utils.extract_key(tuple, sharding_index_parts)

return sharding.key_get_bucket_id(key)
return sharding.key_get_bucket_id(space.name, key)
end

function sharding.tuple_set_and_return_bucket_id(tuple, space, specified_bucket_id)
Expand Down
26 changes: 26 additions & 0 deletions crud/common/sharding/sharding_func.lua
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,32 @@ local function as_callable_object(sharding_func_def, space_name)
)
end

function sharding_func_module.construct_as_callable_obj_cache(metadata_map, specified_space_name)
dev_checks('table', 'string')

local result_err

cache.sharding_func_map = {}
for space_name, metadata in pairs(metadata_map) do
if metadata.sharding_func_def ~= nil then
local sharding_func, err = as_callable_object(metadata.sharding_func_def,
space_name)
if err ~= nil then
if specified_space_name == space_name then
result_err = err
log.error(err)
else
log.warn(err)
end
end

cache.sharding_func_map[space_name] = sharding_func
end
end

return result_err
end

sharding_func_module.internal = {
as_callable_object = as_callable_object,
is_callable = is_callable,
Expand Down
81 changes: 61 additions & 20 deletions crud/common/sharding/sharding_metadata.lua
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ local call = require('crud.common.call')
local const = require('crud.common.const')
local dev_checks = require('crud.common.dev_checks')
local cache = require('crud.common.sharding.sharding_metadata_cache')
local sharding_func = require('crud.common.sharding.sharding_func')
local sharding_key = require('crud.common.sharding.sharding_key')

local FetchShardingMetadataError = errors.new_class('FetchShardingMetadataError', {capture_stack = false})
Expand Down Expand Up @@ -57,25 +58,39 @@ local function extract_sharding_func_def(tuple)
return nil
end

-- Return a map with metadata or nil when space box.space._ddl_sharding_key is
-- not available on storage.
-- Return a map with metadata or nil when spaces box.space._ddl_sharding_key and
-- box.space._ddl_sharding_func are not available on storage.
function sharding_metadata_module.fetch_on_storage()
local sharding_key_space = box.space._ddl_sharding_key
if sharding_key_space == nil then
local sharding_func_space = box.space._ddl_sharding_func

if sharding_key_space == nil and sharding_func_space == nil then
return nil
end

local SPACE_NAME_FIELDNO = 1
local SPACE_SHARDING_KEY_FIELDNO = 2
local metadata_map = {}
for _, tuple in sharding_key_space:pairs() do
local space_name = tuple[SPACE_NAME_FIELDNO]
local sharding_key_def = tuple[SPACE_SHARDING_KEY_FIELDNO]
local space_format = box.space[space_name]:format()
metadata_map[space_name] = {
sharding_key_def = sharding_key_def,
space_format = space_format,
}

if sharding_key_space ~= nil then
for _, tuple in sharding_key_space:pairs() do
local space_name = tuple[SPACE_NAME_FIELDNO]
local sharding_key_def = tuple[SPACE_SHARDING_KEY_FIELDNO]
local space_format = box.space[space_name]:format()
metadata_map[space_name] = {
sharding_key_def = sharding_key_def,
space_format = space_format,
}
end
end

if sharding_func_space ~= nil then
for _, tuple in sharding_func_space:pairs() do
local space_name = tuple[SPACE_NAME_FIELDNO]
local sharding_func_def = extract_sharding_func_def(tuple)
metadata_map[space_name] = metadata_map[space_name] or {}
metadata_map[space_name].sharding_func_def = sharding_func_def
end
end

return metadata_map
Expand All @@ -102,24 +117,21 @@ local _fetch_on_router = locked(function(timeout, space_name, metadata_map_name)
end
if metadata_map == nil then
cache[cache.SHARDING_KEY_MAP_NAME] = {}
cache[cache.SHARDING_FUNC_MAP_NAME] = {}
return
end

local err = sharding_key.construct_as_index_obj_cache(metadata_map, space_name)
if err ~= nil then
return err
end

local err = sharding_func.construct_as_callable_obj_cache(metadata_map, space_name)
if err ~= nil then
return err
end
end)

-- Get sharding index for a certain space.
--
-- Return:
-- - sharding key as index object, when sharding key definition found on
-- storage.
-- - nil, when sharding key definition was not found on storage. Pay attention
-- that nil without error is a successfull return value.
-- - nil and error, when something goes wrong on fetching attempt.
--
local function fetch_on_router(space_name, metadata_map_name, timeout)
if cache[metadata_map_name] ~= nil then
return cache[metadata_map_name][space_name]
Expand All @@ -139,17 +151,46 @@ local function fetch_on_router(space_name, metadata_map_name, timeout)
"Fetching sharding key for space '%s' is failed", space_name)
end

-- Get sharding index for a certain space.
--
-- Return:
-- - sharding key as index object, when sharding key definition found on
-- storage.
-- - nil, when sharding key definition was not found on storage. Pay attention
-- that nil without error is a successfull return value.
-- - nil and error, when something goes wrong on fetching attempt.
--
function sharding_metadata_module.fetch_sharding_key_on_router(space_name, timeout)
dev_checks('string', '?number')

return fetch_on_router(space_name, cache.SHARDING_KEY_MAP_NAME, timeout)
end

-- Get sharding func for a certain space.
--
-- Return:
-- - sharding func as callable object, when sharding func definition found on
-- storage.
-- - nil, when sharding func definition was not found on storage. Pay attention
-- that nil without error is a successfull return value.
-- - nil and error, when something goes wrong on fetching attempt.
--
function sharding_metadata_module.fetch_sharding_func_on_router(space_name, timeout)
dev_checks('string', '?number')

return fetch_on_router(space_name, cache.SHARDING_FUNC_MAP_NAME, timeout)
end

function sharding_metadata_module.update_sharding_key_cache(space_name)
cache.drop_caches()
return sharding_metadata_module.fetch_sharding_key_on_router(space_name)
end

function sharding_metadata_module.update_sharding_func_cache(space_name)
cache.drop_caches()
return sharding_metadata_module.fetch_sharding_func_on_router(space_name)
end

function sharding_metadata_module.init()
_G._crud.fetch_on_storage = sharding_metadata_module.fetch_on_storage
end
Expand Down
3 changes: 3 additions & 0 deletions crud/common/sharding/sharding_metadata_cache.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@ local fiber = require('fiber')
local sharding_metadata_cache = {}

sharding_metadata_cache.SHARDING_KEY_MAP_NAME = "sharding_key_as_index_obj_map"
sharding_metadata_cache.SHARDING_FUNC_MAP_NAME = "sharding_func_map"
sharding_metadata_cache[sharding_metadata_cache.SHARDING_KEY_MAP_NAME] = nil
sharding_metadata_cache[sharding_metadata_cache.SHARDING_FUNC_MAP_NAME] = nil
sharding_metadata_cache.fetch_lock = fiber.channel(1)
sharding_metadata_cache.is_part_of_pk = {}

function sharding_metadata_cache.drop_caches()
sharding_metadata_cache[sharding_metadata_cache.SHARDING_KEY_MAP_NAME] = nil
sharding_metadata_cache[sharding_metadata_cache.SHARDING_FUNC_MAP_NAME] = nil
if sharding_metadata_cache.fetch_lock ~= nil then
sharding_metadata_cache.fetch_lock:close()
end
Expand Down
6 changes: 5 additions & 1 deletion crud/delete.lua
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,11 @@ local function call_delete_on_router(space_name, key, opts)
end
end

local bucket_id = sharding.key_get_bucket_id(sharding_key, opts.bucket_id)
local bucket_id, err = sharding.key_get_bucket_id(space_name, sharding_key, opts.bucket_id)
if err ~= nil then
return nil, err
end

local call_opts = {
mode = 'write',
timeout = opts.timeout,
Expand Down
6 changes: 5 additions & 1 deletion crud/get.lua
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,11 @@ local function call_get_on_router(space_name, key, opts)
end
end

local bucket_id = sharding.key_get_bucket_id(sharding_key, opts.bucket_id)
local bucket_id, err = sharding.key_get_bucket_id(space_name, sharding_key, opts.bucket_id)
if err ~= nil then
return nil, err
end

local call_opts = {
mode = opts.mode or 'read',
prefer_replica = opts.prefer_replica,
Expand Down
6 changes: 5 additions & 1 deletion crud/select/compat/select.lua
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,11 @@ local function build_select_iterator(space_name, user_conditions, opts)
local perform_map_reduce = opts.force_map_call == true or
(opts.bucket_id == nil and plan.sharding_key == nil)
if not perform_map_reduce then
local bucket_id = sharding.key_get_bucket_id(plan.sharding_key, opts.bucket_id)
local bucket_id, err = sharding.key_get_bucket_id(space_name, plan.sharding_key, opts.bucket_id)
if err ~= nil then
return nil, err
end

assert(bucket_id ~= nil)

local err
Expand Down
6 changes: 5 additions & 1 deletion crud/select/compat/select_old.lua
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,11 @@ local function build_select_iterator(space_name, user_conditions, opts)
local perform_map_reduce = opts.force_map_call == true or
(opts.bucket_id == nil and plan.sharding_key == nil)
if not perform_map_reduce then
local bucket_id = sharding.key_get_bucket_id(plan.sharding_key, opts.bucket_id)
local bucket_id, err = sharding.key_get_bucket_id(space_name, plan.sharding_key, opts.bucket_id)
if err ~= nil then
return nil, err
end

assert(bucket_id ~= nil)

local err
Expand Down
6 changes: 5 additions & 1 deletion crud/update.lua
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,11 @@ local function call_update_on_router(space_name, key, user_operations, opts)
end
end

local bucket_id = sharding.key_get_bucket_id(sharding_key, opts.bucket_id)
local bucket_id, err = sharding.key_get_bucket_id(space_name, sharding_key, opts.bucket_id)
if err ~= nil then
return nil, err
end

local call_opts = {
mode = 'write',
timeout = opts.timeout,
Expand Down
5 changes: 5 additions & 0 deletions deps.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,9 @@ tarantoolctl rocks install https://raw.githubusercontent.com/moteus/lua-path/mas
tarantoolctl rocks install https://raw.githubusercontent.com/moteus/luacov-coveralls/master/rockspecs/luacov-coveralls-scm-0.rockspec

tarantoolctl rocks install cartridge 2.7.1

# cartridge depends on ddl 1.5.0-1 (version without
# sharding func support), install latest version
tarantoolctl rocks install ddl scm-1

tarantoolctl rocks make
29 changes: 29 additions & 0 deletions test/entrypoint/srv_ddl.lua
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,18 @@ local cartridge = require('cartridge')
local ddl = require('ddl')

package.preload['customers-storage'] = function()
-- set sharding func in dot.notation
-- in _G for sharding func tests
local some_module = {
sharding_func =
function(key)
if key ~= nil and key[1] ~= nil then
return key[1] % 10
end
end
}
rawset(_G, 'some_module', some_module)

return {
role_name = 'customers-storage',
init = function()
Expand Down Expand Up @@ -131,6 +143,18 @@ package.preload['customers-storage'] = function()
table.insert(customers_name_age_key_three_fields_index_schema.indexes, bucket_id_index)
table.insert(customers_name_age_key_three_fields_index_schema.indexes, three_fields_index)

local customers_id_key_schema = table.deepcopy(customers_schema)
customers_id_key_schema.sharding_key = {'id'}
table.insert(customers_id_key_schema.indexes, primary_index)
table.insert(customers_id_key_schema.indexes, bucket_id_index)
table.insert(customers_id_key_schema.indexes, name_index)

local customers_body_func_schema = table.deepcopy(customers_id_key_schema)
customers_body_func_schema.sharding_func = { body = 'function(key) return key[1] % 10 end' }

local customers_G_func_schema = table.deepcopy(customers_id_key_schema)
customers_G_func_schema.sharding_func = 'some_module.sharding_func'

local schema = {
spaces = {
customers_name_key = customers_name_key_schema,
Expand All @@ -140,6 +164,8 @@ package.preload['customers-storage'] = function()
customers_age_key = customers_age_key_schema,
customers_name_age_key_different_indexes = customers_name_age_key_different_indexes_schema,
customers_name_age_key_three_fields_index = customers_name_age_key_three_fields_index_schema,
customers_G_func = customers_G_func_schema,
customers_body_func = customers_body_func_schema,
}
}

Expand All @@ -154,6 +180,9 @@ package.preload['customers-storage'] = function()
local fieldno_sharding_key = 2
box.space['_ddl_sharding_key']:update(space_name, {{'=', fieldno_sharding_key, sharding_key_def}})
end)
rawset(_G, 'set_sharding_func', function(space_name, fieldno_sharding_func, sharding_func_def)
box.space['_ddl_sharding_func']:update(space_name, {{'=', fieldno_sharding_func, sharding_func_def}})
end)
end,
}
end
Expand Down
Loading

0 comments on commit c63b589

Please sign in to comment.