Skip to content

Commit

Permalink
Use DDL sharding key to calculate bucket_id
Browse files Browse the repository at this point in the history
Previously by default primary key is used to calculate bucket id.

CRUD allows to automatically calculate `bucket_id` based on primary key.
However DDL users may set sharding key in DDL schema and now CRUD
allows to use that sharding key to calculate bucket id.

Closes #166
  • Loading branch information
ligurio committed Jul 28, 2021
1 parent ea7560d commit ca5eced
Show file tree
Hide file tree
Showing 10 changed files with 564 additions and 8 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
* `crud.len()` function to calculate the number of tuples
in the space for memtx engine and calculate the maximum
approximate number of tuples in the space for vinyl engine.
* CRUD operations automatically calculate bucket id using sharding
key specified with DDL schema.

## [0.8.0] - 02-07-21

Expand All @@ -26,6 +28,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

### Added

* Support calculating `bucket_id` based on `ddl.sharding_key`.
* Added jsonpath indexes support for queries
* `tuple-merger` module updated to 0.0.2

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ crud.unflatten_rows(res.rows, res.metadata)

* A space should have a format.
* By default, `bucket_id` is computed as `vshard.router.bucket_id_strcrc32(key)`,
where `key` is the primary key value.
where `key` is the primary key value or a [sharding key set by DDL schema](https://github.com/tarantool/ddl#input-data-format).
Custom bucket ID can be specified as `opts.bucket_id` for each operation.
For operations that accepts tuple/object bucket ID can be specified as
tuple/object field as well as `opts.bucket_id` value.
Expand Down
106 changes: 105 additions & 1 deletion crud/common/sharding.lua
Original file line number Diff line number Diff line change
@@ -1,10 +1,21 @@
local vshard = require('vshard')
local errors = require('errors')
local ddl, err = pcall(require, 'ddl')
if ddl == true then
ddl = err
end

local BucketIDError = errors.new_class("BucketIDError", {capture_stack = false})
local ShardingKeyError = errors.new_class("ShardingKeyError", {capture_stack = false})

local dev_checks = require('crud.common.dev_checks')
local utils = require('crud.common.utils')

-- TODO: invalidate ddl_schema_cache
local ddl_schema_cache = nil
local primary_and_sharding_key_matched = nil
local sharding_key_in_primary_index = nil

local sharding = {}

function sharding.key_get_bucket_id(key, specified_bucket_id)
Expand All @@ -20,7 +31,16 @@ function sharding.tuple_get_bucket_id(tuple, space, specified_bucket_id)
return specified_bucket_id
end

local key = utils.extract_key(tuple, space.index[0].parts)
local primary_index = space.index[0]
local key
local sharding_key = sharding.get_ddl_sharding_key(space.name)
if sharding_key ~= nil then
local sharding_key_fieldno_map = utils.get_keys_fieldno_map(space:format(), sharding_key)
key = utils.extract_sharding_key(tuple, primary_index.parts, sharding_key_fieldno_map)
else
key = utils.extract_key(tuple, primary_index.parts)
end

return sharding.key_get_bucket_id(key)
end

Expand Down Expand Up @@ -51,4 +71,88 @@ function sharding.tuple_set_and_return_bucket_id(tuple, space, specified_bucket_
return bucket_id
end

-- Get sharding key (actually field names) using DDL module.
function sharding.get_ddl_sharding_key(space_name)
dev_checks('string')
if ddl == false then
return nil
end
if ddl_schema_cache == nil then
ddl_schema_cache = ddl.get_schema()
end
if ddl_schema_cache.spaces == nil then
return nil
end
local space_schema = ddl_schema_cache.spaces[space_name]
if space_schema == nil or
space_schema.sharding_key == nil then
return nil
end

return space_schema.sharding_key
end

-- Make sure fields used in sharding key are present in primary index.
-- TODO: invalidate sharding_key_missed_in_index
function sharding.is_sharding_key_in_primary_index(space_name, primary_index, sharding_key_fieldno_map)
dev_checks('string', 'table', 'table')
if sharding_key_in_primary_index[space_name] ~= nil then
return sharding_key_in_primary_index[space_name]
end
local primary_index_fieldno = utils.get_index_fieldno_map(primary_index)
for fieldno in pairs(sharding_key_fieldno_map) do
if primary_index_fieldno[fieldno] == false then
sharding_key_in_primary_index[space_name] = false
break
end
end

return sharding_key_in_primary_index[space_name]
end

-- Build an array with sharding key values.
-- Returns a table with sharding keys values or nil.
local function build_sharding_key(key, space_name, primary_index, sharding_key_fieldno_map)
dev_checks('table', 'table', 'string')
local sharding_key = {}
if primary_and_sharding_key_matched[space_name] == nil then
for i, k in ipairs(key) do
local fieldno = primary_index.parts[i].fieldno
primary_and_sharding_key_matched[space_name] = sharding_key_fieldno_map[fieldno] or false
if primary_and_sharding_key_matched[space_name] == false then
break
end
table.insert(sharding_key, k)
end
end
if primary_and_sharding_key_matched[space_name] == false then
return nil, ShardingKeyError:new("Primary and sharding keys are not matched")
end

return sharding_key
end

-- Build sharding key using DDL sharding key.
-- Returns a table with sharding key or pair of nil with error.
function sharding.build_ddl_sharding_key(space_obj, sharding_key, ddl_sharding_key, bucket_id)
dev_checks('table', 'table', 'table', 'number')
local primary_index = space_obj.index[0]
local space_format = space_obj:format()
local space_name = space_obj.name

local sharding_key_fieldno_map = utils.get_keys_fieldno_map(space_format, ddl_sharding_key)
if sharding_key_fieldno_map == nil then
return nil, ShardingKeyError:new("Sharding key(s) not found in a space format")
end
if sharding.is_sharding_key_in_primary_index(space_name, primary_index, sharding_key_fieldno_map) == false and
bucket_id == nil then
return nil, ShardingKeyError:new("Sharding key is missed in primary index, specify bucket_id")
end
if type(sharding_key) ~= 'table' then
sharding_key = {sharding_key}
end

return build_sharding_key(sharding_key, space_name, primary_index, sharding_key_fieldno_map)
end

return sharding
58 changes: 58 additions & 0 deletions crud/common/utils.lua
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,17 @@ function utils.unflatten(tuple, space_format)
return object
end

function utils.extract_sharding_key(tuple, key_parts, sharding_fieldno_map)
local key = {}
for i, part in ipairs(key_parts) do
local fieldno = part.fieldno
if sharding_fieldno_map[fieldno] ~= nil then
key[i] = tuple[fieldno]
end
end
return key
end

function utils.extract_key(tuple, key_parts)
local key = {}
for i, part in ipairs(key_parts) do
Expand Down Expand Up @@ -375,6 +386,43 @@ function utils.get_bucket_id_fieldno(space, shard_index_name)
return bucket_id_index.parts[1].fieldno
end

function utils.get_format_fieldno_map(space_format)
local t = {}
for i, field_format in ipairs(space_format) do
t[field_format.name] = i
end

return t
end

--- Get a map with fieldno of passed field's names.
--
-- @function get_keys_fieldno_map
--
-- @param table space_format
-- A space format
--
-- @param keys
-- A table with field names.
--
-- @return[1] table
-- @return[2] nil
--
function utils.get_keys_fieldno_map(space_format, field_names)
dev_checks('table', 'table')
local t = {}
local fieldno_map = utils.get_format_fieldno_map(space_format)
for _, field_name in ipairs(field_names) do
local fieldno = fieldno_map[field_name]
if fieldno == nil then
return nil
end
t[fieldno] = true
end

return t
end

local uuid_t = ffi.typeof('struct tt_uuid')
function utils.is_uuid(value)
return ffi.istype(uuid_t, value)
Expand Down Expand Up @@ -524,4 +572,14 @@ function utils.flatten_obj_reload(space_name, obj)
return schema.wrap_func_reload(flatten_obj, space_name, obj)
end

function utils.get_index_fieldno_map(index_obj)
local t = {}
for i, part in ipairs(index_obj.parts) do
local fieldno = part[i].fieldno
t[fieldno] = true
end

return t
end

return utils
12 changes: 11 additions & 1 deletion crud/delete.lua
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,17 @@ local function call_delete_on_router(space_name, key, opts)
key = key:totable()
end

local bucket_id = sharding.key_get_bucket_id(key, opts.bucket_id)
local ddl_sharding_key = sharding.get_ddl_sharding_key(space_name)
local sharding_key = key
if ddl_sharding_key ~= nil then
local err
sharding_key, err = sharding_key.build_ddl_sharding_key(space, sharding_key, ddl_sharding_key, opts.bucket_id)
if sharding_key == nil then
return nil, err
end
end

local bucket_id = sharding.key_get_bucket_id(sharding_key, opts.bucket_id)
local call_opts = {
mode = 'write',
timeout = opts.timeout,
Expand Down
12 changes: 11 additions & 1 deletion crud/get.lua
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,17 @@ local function call_get_on_router(space_name, key, opts)
key = key:totable()
end

local bucket_id = sharding.key_get_bucket_id(key, opts.bucket_id)
local ddl_sharding_key = sharding.get_ddl_sharding_key(space_name)
local sharding_key = key
if ddl_sharding_key ~= nil then
local err
sharding_key, err = sharding_key.build_ddl_sharding_key(space, sharding_key, ddl_sharding_key, opts.bucket_id)
if sharding_key == nil then
return nil, err
end
end

local bucket_id = sharding.key_get_bucket_id(sharding_key, opts.bucket_id)
local call_opts = {
mode = opts.mode or 'read',
prefer_replica = opts.prefer_replica,
Expand Down
20 changes: 17 additions & 3 deletions crud/select/plan.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ local errors = require('errors')
local compare_conditions = require('crud.compare.conditions')
local utils = require('crud.common.utils')
local dev_checks = require('crud.common.dev_checks')
local sharding = require('crud.common.sharding')

local compat = require('crud.common.compat')
local has_keydef = compat.exists('tuple.keydef', 'key_def')
Expand Down Expand Up @@ -48,7 +49,7 @@ local function get_index_for_condition(space_indexes, space_format, condition)
end
end

local function extract_sharding_key_from_scan_value(scan_value, scan_index, sharding_index)
local function extract_sharding_key_from_scan_value(scan_value, scan_index, sharding_index, space_format, space_name)
if #scan_value < #sharding_index.parts then
return nil
end
Expand All @@ -64,6 +65,11 @@ local function extract_sharding_key_from_scan_value(scan_value, scan_index, shar

-- check that sharding key is included in the scan index fields
local sharding_key = {}
local ddl_sharding_key = sharding.get_ddl_sharding_key(space_name)
local sharding_key_fieldno_map = {}
if ddl_sharding_key ~= nil then
sharding_key_fieldno_map = utils.get_keys_fieldno_map(space_format, ddl_sharding_key)
end
for _, sharding_key_part in ipairs(sharding_index.parts) do
local fieldno = sharding_key_part.fieldno

Expand All @@ -79,7 +85,14 @@ local function extract_sharding_key_from_scan_value(scan_value, scan_index, shar
return nil
end

table.insert(sharding_key, field_value)
-- check if a field is a part of DDL sharding key
if next(sharding_key_fieldno_map) ~= nil then
if sharding_key_fieldno_map[fieldno] == true then
table.insert(sharding_key, field_value)
end
else
table.insert(sharding_key, field_value)
end
end

return sharding_key
Expand Down Expand Up @@ -231,7 +244,8 @@ function select_plan.new(space, conditions, opts)
-- get sharding key value
local sharding_key
if scan_value ~= nil and (scan_iter == box.index.EQ or scan_iter == box.index.REQ) then
sharding_key = extract_sharding_key_from_scan_value(scan_value, scan_index, sharding_index)
sharding_key = extract_sharding_key_from_scan_value(scan_value, scan_index, sharding_index,
space_format, space_name)
end

if sharding_key ~= nil and opts.force_map_call ~= true then
Expand Down
11 changes: 10 additions & 1 deletion crud/update.lua
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,15 @@ local function call_update_on_router(space_name, key, user_operations, opts)
key = key:totable()
end

local ddl_sharding_key = sharding.get_ddl_sharding_key(space_name)
local sharding_key = key
if ddl_sharding_key ~= nil then
sharding_key, err = sharding_key.build_ddl_sharding_key(space, sharding_key, ddl_sharding_key, opts.bucket_id)
if sharding_key == nil then
return nil, err
end
end

local operations = user_operations
if not utils.tarantool_supports_fieldpaths() then
operations, err = utils.convert_operations(user_operations, space_format)
Expand All @@ -91,7 +100,7 @@ local function call_update_on_router(space_name, key, user_operations, opts)
end
end

local bucket_id = sharding.key_get_bucket_id(key, opts.bucket_id)
local bucket_id = sharding.key_get_bucket_id(sharding_key, opts.bucket_id)
local call_opts = {
mode = 'write',
timeout = opts.timeout,
Expand Down
Loading

0 comments on commit ca5eced

Please sign in to comment.