Skip to content

Commit

Permalink
Add support of custom sharding func for crud methods
Browse files Browse the repository at this point in the history
This commit introduces modifications in functions
for fetching sharding metadata on storage and router
to get sharding function. Function `sharding.key_get_bucket_id`
calculates bucket_id using DDL sharding function if
sharding function exist for specified space. Description in
documentation, integration and unit tests are added
as well.

Closes #237
  • Loading branch information
AnaNek committed Feb 21, 2022
1 parent 6b7151c commit 2887e18
Show file tree
Hide file tree
Showing 19 changed files with 886 additions and 32 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
must be a part of primary key.
* `crud.count()` function to calculate the number of tuples
in the space according to conditions.
* Support bucket id calculating using sharding func specified in
DDL schema or in `_ddl_sharding_func` space.

### Fixed

Expand Down
32 changes: 31 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,25 @@ documentation). As soon as sharding key for a certain space is available in
automatically. Note that CRUD methods `delete()`, `get()` and `update()`
requires that sharding key must be a part of primary key.

You can specify sharding function to calculate bucket_id with
sharding func definition as a part of [DDL
schema](https://github.com/tarantool/ddl#input-data-format)
or insert manually to the space `_ddl_sharding_func`.

CRUD uses `strcrc32` as sharding function by default.
The reason why using of `strcrc32` is undesirable is that
this sharding function is not consistent for cdata numbers.
In particular, it returns 3 different values for normal Lua
numbers like 123, for `unsigned long long` cdata
(like `123ULL`, or `ffi.cast('unsigned long long',
123)`), and for `signed long long` cdata (like `123LL`, or
`ffi.cast('long long', 123)`).

We cannot change default sharding function `strcrc32`
due to backward compatibility concerns, but please consider
using better alternatives for sharding function.
`mpcrc32` is one of them.

Table below describe what operations supports custom sharding key:

| CRUD method | Sharding key support |
Expand All @@ -101,12 +120,23 @@ Current limitations for using custom sharding key:
- It's not possible to update sharding keys automatically when schema is
updated on storages, see
[#212](https://github.com/tarantool/crud/issues/212). However it is possible
to do it manually with `require('crud.common.sharding_key').update_cache()`.
to do it manually with `require('crud.common.sharding_key').update_cache()`
(this function updates both caches: sharding key cache and sharding function
cache, but returned value is sharding key from cache).
- No support of JSON path for sharding key, see
[#219](https://github.com/tarantool/crud/issues/219).
- `primary_index_fieldno_map` is not cached, see
[#243](https://github.com/tarantool/crud/issues/243).

Current limitations for using custom sharding functions:

- It's not possible to update sharding functions automatically when schema is
updated on storages, see
[#212](https://github.com/tarantool/crud/issues/212). However it is possible
to do it manually with `require('crud.common.sharding_func').update_cache()`
(this function updates both caches: sharding key cache and sharding function
cache, but returned value is sharding function from cache).

### Insert

```lua
Expand Down
16 changes: 14 additions & 2 deletions crud/common/sharding/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ local BucketIDError = errors.new_class("BucketIDError", {capture_stack = false})
local GetReplicasetsError = errors.new_class('GetReplicasetsError', {capture_stack = false})

local utils = require('crud.common.utils')
local dev_checks = require('crud.common.dev_checks')
local sharding_metadata_module = require('crud.common.sharding.sharding_metadata')

local sharding = {}
Expand All @@ -20,11 +21,22 @@ function sharding.get_replicasets_by_bucket_id(bucket_id)
}
end

function sharding.key_get_bucket_id(key, specified_bucket_id)
function sharding.key_get_bucket_id(space_name, key, specified_bucket_id)
dev_checks('string', '?', '?number|cdata')

if specified_bucket_id ~= nil then
return specified_bucket_id
end

local sharding_func, err = sharding_metadata_module.fetch_sharding_func_on_router(space_name)
if err ~= nil then
return nil, err
end

if sharding_func ~= nil then
return sharding_func(key)
end

return vshard.router.bucket_id_strcrc32(key)
end

Expand All @@ -43,7 +55,7 @@ function sharding.tuple_get_bucket_id(tuple, space, specified_bucket_id)
end
local key = utils.extract_key(tuple, sharding_index_parts)

return sharding.key_get_bucket_id(key)
return sharding.key_get_bucket_id(space.name, key)
end

function sharding.tuple_set_and_return_bucket_id(tuple, space, specified_bucket_id)
Expand Down
29 changes: 29 additions & 0 deletions crud/common/sharding/sharding_func.lua
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
local errors = require('errors')
local log = require('log')

local dev_checks = require('crud.common.dev_checks')
local cache = require('crud.common.sharding.sharding_metadata_cache')
local utils = require('crud.common.utils')

local ShardingFuncError = errors.new_class('ShardingFuncError', {capture_stack = false})
Expand Down Expand Up @@ -74,6 +77,32 @@ local function as_callable_object(sharding_func_def, space_name)
)
end

function sharding_func_module.construct_as_callable_obj_cache(metadata_map, specified_space_name)
dev_checks('table', 'string')

local result_err

cache.sharding_func_map = {}
for space_name, metadata in pairs(metadata_map) do
if metadata.sharding_func_def ~= nil then
local sharding_func, err = as_callable_object(metadata.sharding_func_def,
space_name)
if err ~= nil then
if specified_space_name == space_name then
result_err = err
log.error(err)
else
log.warn(err)
end
end

cache.sharding_func_map[space_name] = sharding_func
end
end

return result_err
end

sharding_func_module.internal = {
as_callable_object = as_callable_object,
is_callable = is_callable,
Expand Down
102 changes: 82 additions & 20 deletions crud/common/sharding/sharding_metadata.lua
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ local call = require('crud.common.call')
local const = require('crud.common.const')
local dev_checks = require('crud.common.dev_checks')
local cache = require('crud.common.sharding.sharding_metadata_cache')
local sharding_func = require('crud.common.sharding.sharding_func')
local sharding_key = require('crud.common.sharding.sharding_key')

local FetchShardingMetadataError = errors.new_class('FetchShardingMetadataError', {capture_stack = false})
Expand Down Expand Up @@ -38,25 +39,58 @@ local function locked(f)
end
end

-- Return a map with metadata or nil when space box.space._ddl_sharding_key is
-- not available on storage.
local function extract_sharding_func_def(tuple)
if not tuple then
return nil
end

local SPACE_SHARDING_FUNC_NAME_FIELDNO = 2
local SPACE_SHARDING_FUNC_BODY_FIELDNO = 3

if tuple[SPACE_SHARDING_FUNC_BODY_FIELDNO] ~= nil then
return {body = tuple[SPACE_SHARDING_FUNC_BODY_FIELDNO]}
end

if tuple[SPACE_SHARDING_FUNC_NAME_FIELDNO] ~= nil then
return tuple[SPACE_SHARDING_FUNC_NAME_FIELDNO]
end

return nil
end

-- Return a map with metadata or nil when spaces box.space._ddl_sharding_key and
-- box.space._ddl_sharding_func are not available on storage.
function sharding_metadata_module.fetch_on_storage()
local sharding_key_space = box.space._ddl_sharding_key
if sharding_key_space == nil then
local sharding_func_space = box.space._ddl_sharding_func

if sharding_key_space == nil and sharding_func_space == nil then
return nil
end

local SPACE_NAME_FIELDNO = 1
local SPACE_SHARDING_KEY_FIELDNO = 2
local metadata_map = {}
for _, tuple in sharding_key_space:pairs() do
local space_name = tuple[SPACE_NAME_FIELDNO]
local sharding_key_def = tuple[SPACE_SHARDING_KEY_FIELDNO]
local space_format = box.space[space_name]:format()
metadata_map[space_name] = {
sharding_key_def = sharding_key_def,
space_format = space_format,
}

if sharding_key_space ~= nil then
for _, tuple in sharding_key_space:pairs() do
local space_name = tuple[SPACE_NAME_FIELDNO]
local sharding_key_def = tuple[SPACE_SHARDING_KEY_FIELDNO]
local space_format = box.space[space_name]:format()
metadata_map[space_name] = {
sharding_key_def = sharding_key_def,
space_format = space_format,
}
end
end

if sharding_func_space ~= nil then
for _, tuple in sharding_func_space:pairs() do
local space_name = tuple[SPACE_NAME_FIELDNO]
local sharding_func_def = extract_sharding_func_def(tuple)
metadata_map[space_name] = metadata_map[space_name] or {}
metadata_map[space_name].sharding_func_def = sharding_func_def
end
end

return metadata_map
Expand All @@ -83,24 +117,21 @@ local _fetch_on_router = locked(function(timeout, space_name, metadata_map_name)
end
if metadata_map == nil then
cache[cache.SHARDING_KEY_MAP_NAME] = {}
cache[cache.SHARDING_FUNC_MAP_NAME] = {}
return
end

local err = sharding_key.construct_as_index_obj_cache(metadata_map, space_name)
if err ~= nil then
return err
end

local err = sharding_func.construct_as_callable_obj_cache(metadata_map, space_name)
if err ~= nil then
return err
end
end)

-- Get sharding index for a certain space.
--
-- Return:
-- - sharding key as index object, when sharding key definition found on
-- storage.
-- - nil, when sharding key definition was not found on storage. Pay attention
-- that nil without error is a successfull return value.
-- - nil and error, when something goes wrong on fetching attempt.
--
local function fetch_on_router(space_name, metadata_map_name, timeout)
if cache[metadata_map_name] ~= nil then
return cache[metadata_map_name][space_name]
Expand All @@ -120,17 +151,48 @@ local function fetch_on_router(space_name, metadata_map_name, timeout)
"Fetching sharding key for space '%s' is failed", space_name)
end

-- Get sharding index for a certain space.
--
-- Return:
-- - sharding key as index object, when sharding key definition found on
-- storage.
-- - nil, when sharding key definition was not found on storage. Pay attention
-- that nil without error is a successfull return value.
-- - nil and error, when something goes wrong on fetching attempt.
--
function sharding_metadata_module.fetch_sharding_key_on_router(space_name, timeout)
dev_checks('string', '?number')

return fetch_on_router(space_name, cache.SHARDING_KEY_MAP_NAME, timeout)
end

-- Get sharding func for a certain space.
--
-- Return:
-- - sharding func as callable object, when sharding func definition found on
-- storage.
-- - nil, when sharding func definition was not found on storage. Pay attention
-- that nil without error is a successfull return value.
-- - nil and error, when something goes wrong on fetching attempt.
--
function sharding_metadata_module.fetch_sharding_func_on_router(space_name, timeout)
dev_checks('string', '?number')

return fetch_on_router(space_name, cache.SHARDING_FUNC_MAP_NAME, timeout)
end

function sharding_metadata_module.update_sharding_key_cache(space_name)
cache.drop_caches()

return sharding_metadata_module.fetch_sharding_key_on_router(space_name)
end

function sharding_metadata_module.update_sharding_func_cache(space_name)
cache.drop_caches()

return sharding_metadata_module.fetch_sharding_func_on_router(space_name)
end

function sharding_metadata_module.init()
_G._crud.fetch_on_storage = sharding_metadata_module.fetch_on_storage
end
Expand Down
3 changes: 3 additions & 0 deletions crud/common/sharding/sharding_metadata_cache.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@ local fiber = require('fiber')
local sharding_metadata_cache = {}

sharding_metadata_cache.SHARDING_KEY_MAP_NAME = "sharding_key_as_index_obj_map"
sharding_metadata_cache.SHARDING_FUNC_MAP_NAME = "sharding_func_map"
sharding_metadata_cache[sharding_metadata_cache.SHARDING_KEY_MAP_NAME] = nil
sharding_metadata_cache[sharding_metadata_cache.SHARDING_FUNC_MAP_NAME] = nil
sharding_metadata_cache.fetch_lock = fiber.channel(1)
sharding_metadata_cache.is_part_of_pk = {}

function sharding_metadata_cache.drop_caches()
sharding_metadata_cache[sharding_metadata_cache.SHARDING_KEY_MAP_NAME] = nil
sharding_metadata_cache[sharding_metadata_cache.SHARDING_FUNC_MAP_NAME] = nil
if sharding_metadata_cache.fetch_lock ~= nil then
sharding_metadata_cache.fetch_lock:close()
end
Expand Down
15 changes: 15 additions & 0 deletions crud/common/sharding_func.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
local sharding_metadata_module = require('crud.common.sharding.sharding_metadata')

local sharding_func_cache = {}

-- This method is exported here because
-- we already have customers using old API
-- for updating sharding key cache in their
-- projects like `require('crud.common.sharding_key').update_cache()`
-- This method provides similar behavior for
-- sharding function cache.
function sharding_func_cache.update_cache(space_name)
return sharding_metadata_module.update_sharding_func_cache(space_name)
end

return sharding_func_cache
6 changes: 5 additions & 1 deletion crud/count.lua
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,11 @@ local function call_count_on_router(space_name, user_conditions, opts)
local perform_map_reduce = opts.force_map_call == true or
(opts.bucket_id == nil and plan.sharding_key == nil)
if not perform_map_reduce then
local bucket_id = sharding.key_get_bucket_id(plan.sharding_key, opts.bucket_id)
local bucket_id, err = sharding.key_get_bucket_id(space_name, plan.sharding_key, opts.bucket_id)
if err ~= nil then
return nil, err
end

assert(bucket_id ~= nil)

local err
Expand Down
6 changes: 5 additions & 1 deletion crud/delete.lua
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,11 @@ local function call_delete_on_router(space_name, key, opts)
end
end

local bucket_id = sharding.key_get_bucket_id(sharding_key, opts.bucket_id)
local bucket_id, err = sharding.key_get_bucket_id(space_name, sharding_key, opts.bucket_id)
if err ~= nil then
return nil, err
end

local call_opts = {
mode = 'write',
timeout = opts.timeout,
Expand Down
6 changes: 5 additions & 1 deletion crud/get.lua
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,11 @@ local function call_get_on_router(space_name, key, opts)
end
end

local bucket_id = sharding.key_get_bucket_id(sharding_key, opts.bucket_id)
local bucket_id, err = sharding.key_get_bucket_id(space_name, sharding_key, opts.bucket_id)
if err ~= nil then
return nil, err
end

local call_opts = {
mode = opts.mode or 'read',
prefer_replica = opts.prefer_replica,
Expand Down
6 changes: 5 additions & 1 deletion crud/select/compat/select.lua
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,11 @@ local function build_select_iterator(space_name, user_conditions, opts)
local perform_map_reduce = opts.force_map_call == true or
(opts.bucket_id == nil and plan.sharding_key == nil)
if not perform_map_reduce then
local bucket_id = sharding.key_get_bucket_id(plan.sharding_key, opts.bucket_id)
local bucket_id, err = sharding.key_get_bucket_id(space_name, plan.sharding_key, opts.bucket_id)
if err ~= nil then
return nil, err
end

assert(bucket_id ~= nil)

local err
Expand Down
6 changes: 5 additions & 1 deletion crud/select/compat/select_old.lua
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,11 @@ local function build_select_iterator(space_name, user_conditions, opts)
local perform_map_reduce = opts.force_map_call == true or
(opts.bucket_id == nil and plan.sharding_key == nil)
if not perform_map_reduce then
local bucket_id = sharding.key_get_bucket_id(plan.sharding_key, opts.bucket_id)
local bucket_id, err = sharding.key_get_bucket_id(space_name, plan.sharding_key, opts.bucket_id)
if err ~= nil then
return nil, err
end

assert(bucket_id ~= nil)

local err
Expand Down
Loading

0 comments on commit 2887e18

Please sign in to comment.