Skip to content

Commit

Permalink
Use DDL sharding key to calculate bucket_id
Browse files Browse the repository at this point in the history
Previously by default primary key is used to calculate bucket id.

CRUD allows to automatically calculate `bucket_id` based on primary key.
However DDL users may set sharding key in DDL schema and now CRUD
allows to use that sharding key to calculate bucket id.

Closes #166
  • Loading branch information
ligurio committed Jul 29, 2021
1 parent ea7560d commit 3dccab0
Show file tree
Hide file tree
Showing 10 changed files with 539 additions and 8 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
* `crud.len()` function to calculate the number of tuples
in the space for memtx engine and calculate the maximum
approximate number of tuples in the space for vinyl engine.
* CRUD operations automatically calculate bucket id using sharding
key specified with DDL schema.

## [0.8.0] - 02-07-21

Expand All @@ -26,6 +28,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

### Added

* Support calculating `bucket_id` based on `ddl.sharding_key`.
* Added jsonpath indexes support for queries
* `tuple-merger` module updated to 0.0.2

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ crud.unflatten_rows(res.rows, res.metadata)

* A space should have a format.
* By default, `bucket_id` is computed as `vshard.router.bucket_id_strcrc32(key)`,
where `key` is the primary key value.
where `key` is the primary key value or a [sharding key set by DDL schema](https://github.com/tarantool/ddl#input-data-format).
Custom bucket ID can be specified as `opts.bucket_id` for each operation.
For operations that accepts tuple/object bucket ID can be specified as
tuple/object field as well as `opts.bucket_id` value.
Expand Down
100 changes: 99 additions & 1 deletion crud/common/sharding.lua
Original file line number Diff line number Diff line change
@@ -1,10 +1,20 @@
local vshard = require('vshard')
local errors = require('errors')
local ddl, err = pcall(require, 'ddl')
if ddl == true then
ddl = err
end

local BucketIDError = errors.new_class("BucketIDError", {capture_stack = false})
local ShardingKeyError = errors.new_class("ShardingKeyError", {capture_stack = false})

local dev_checks = require('crud.common.dev_checks')
local utils = require('crud.common.utils')

-- TODO: invalidate ddl_schema_cache
local ddl_schema_cache = nil
local sharding_key_in_primary_index = {}

local sharding = {}

function sharding.key_get_bucket_id(key, specified_bucket_id)
Expand All @@ -20,7 +30,20 @@ function sharding.tuple_get_bucket_id(tuple, space, specified_bucket_id)
return specified_bucket_id
end

local key = utils.extract_key(tuple, space.index[0].parts)
local primary_index = space.index[0]
local key
local sharding_key = sharding.get_ddl_sharding_key(space.name)
if sharding_key ~= nil then
local sharding_keys_fieldnos = utils.get_keys_fieldnos(space:format(), sharding_key)
if sharding_keys_fieldnos ~= nil then
key = utils.extract_sharding_key(tuple, sharding_keys_fieldnos)
end
end

if key == nil then
key = utils.extract_key(tuple, primary_index.parts)
end

return sharding.key_get_bucket_id(key)
end

Expand Down Expand Up @@ -51,4 +74,79 @@ function sharding.tuple_set_and_return_bucket_id(tuple, space, specified_bucket_
return bucket_id
end

-- Get sharding key (actually field names) using DDL module.
function sharding.get_ddl_sharding_key(space_name)
dev_checks('string')
if ddl == false then
return nil
end
if ddl_schema_cache == nil then
ddl_schema_cache = ddl.get_schema()
end
assert(type(ddl_schema_cache.spaces), 'table')
local space_schema = ddl_schema_cache.spaces[space_name]
if space_schema == nil or
space_schema.sharding_key == nil then
return nil
end

return space_schema.sharding_key
end

-- Make sure fields used in sharding key are present in primary index.
-- TODO: invalidate table sharding_key_in_primary_index
function sharding.is_sharding_key_in_primary_index(space_name, primary_index, fieldnos)
dev_checks('string', 'table', 'table')
if sharding_key_in_primary_index[space_name] == nil then
local primary_index_fieldno = utils.get_index_fieldno_map(primary_index)
for _, fieldno in ipairs(fieldnos) do
if primary_index_fieldno[fieldno] == false then
sharding_key_in_primary_index[space_name] = false
break
end
end
end

return sharding_key_in_primary_index[space_name]
end

-- Build an array with sharding key values.
-- Returns a table with sharding keys values or nil.
local function build_sharding_key(key, index_parts, sharding_key_fieldno_map)
dev_checks('table', 'table', 'table')
local sharding_key = {}
for i, k in ipairs(key) do
local fieldno = index_parts[i].fieldno
if sharding_key_fieldno_map[fieldno] == true then
table.insert(sharding_key, k)
end
end

return sharding_key
end

-- Build sharding key using DDL sharding key.
-- Returns a table with sharding key or pair of nil with error.
function sharding.build_ddl_sharding_key(space_obj, sharding_key, ddl_sharding_key, bucket_id)
dev_checks('table', '?', 'table', 'number|nil')
local primary_index = space_obj.index[0]
local space_format = space_obj:format()
local space_name = space_obj.name

local sharding_keys_fieldnos = utils.get_keys_fieldnos(space_format, ddl_sharding_key)
if sharding_keys_fieldnos == nil then
return nil, ShardingKeyError:new("Sharding key(s) not found in a space format")
end
if sharding.is_sharding_key_in_primary_index(space_name, primary_index, sharding_keys_fieldnos) == false and
bucket_id == nil then
return nil, ShardingKeyError:new("Sharding key is missed in primary index, specify bucket_id")
end
if type(sharding_key) ~= 'table' then
sharding_key = {sharding_key}
end

local sharding_key_fieldno_map = utils.get_keys_fieldno_map(space_format, ddl_sharding_key)
return build_sharding_key(sharding_key, primary_index.parts, sharding_key_fieldno_map)
end

return sharding
71 changes: 71 additions & 0 deletions crud/common/utils.lua
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,15 @@ function utils.unflatten(tuple, space_format)
return object
end

function utils.extract_sharding_key(tuple, fieldnos)
local key = {}
for _, fieldno in ipairs(fieldnos) do
key[fieldno] = tuple[fieldno]
end

return key
end

function utils.extract_key(tuple, key_parts)
local key = {}
for i, part in ipairs(key_parts) do
Expand Down Expand Up @@ -375,6 +384,68 @@ function utils.get_bucket_id_fieldno(space, shard_index_name)
return bucket_id_index.parts[1].fieldno
end

function utils.get_index_fieldno_map(index_obj)
local t = {}
for _, part in ipairs(index_obj.parts) do
local fieldno = part.fieldno
t[fieldno] = true
end

return t
end

function utils.get_format_fieldno_map(space_format)
local t = {}
for fieldno, field_format in ipairs(space_format) do
t[field_format.name] = fieldno
end

return t
end

--- Get a map with fieldno of passed field's names.
--
-- @function get_keys_fieldno_map
--
-- @param table space_format
-- A space format
--
-- @param keys
-- A table with field names.
--
-- @return[1] table
-- @return[2] nil
--
function utils.get_keys_fieldno_map(space_format, field_names)
dev_checks('table', 'table')
local t = {}
local fieldno_map = utils.get_format_fieldno_map(space_format)
for _, field_name in ipairs(field_names) do
local fieldno = fieldno_map[field_name]
if fieldno == nil then
return nil
end
t[fieldno] = true
end

return t
end

function utils.get_keys_fieldnos(space_format, field_names)
dev_checks('table', 'table')
local t = {}
local format_fieldno_map = utils.get_format_fieldno_map(space_format)
for _, field_name in ipairs(field_names) do
local fieldno = format_fieldno_map[field_name]
if fieldno == nil then
return nil
end
table.insert(t, fieldno)
end

return t
end

local uuid_t = ffi.typeof('struct tt_uuid')
function utils.is_uuid(value)
return ffi.istype(uuid_t, value)
Expand Down
12 changes: 11 additions & 1 deletion crud/delete.lua
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,17 @@ local function call_delete_on_router(space_name, key, opts)
key = key:totable()
end

local bucket_id = sharding.key_get_bucket_id(key, opts.bucket_id)
local ddl_sharding_key = sharding.get_ddl_sharding_key(space_name)
local sharding_key = key
if ddl_sharding_key ~= nil then
local err
sharding_key, err = sharding.build_ddl_sharding_key(space, sharding_key, ddl_sharding_key, opts.bucket_id)
if sharding_key == nil then
return nil, err
end
end

local bucket_id = sharding.key_get_bucket_id(sharding_key, opts.bucket_id)
local call_opts = {
mode = 'write',
timeout = opts.timeout,
Expand Down
12 changes: 11 additions & 1 deletion crud/get.lua
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,17 @@ local function call_get_on_router(space_name, key, opts)
key = key:totable()
end

local bucket_id = sharding.key_get_bucket_id(key, opts.bucket_id)
local ddl_sharding_key = sharding.get_ddl_sharding_key(space_name)
local sharding_key = key
if ddl_sharding_key ~= nil then
local err
sharding_key, err = sharding.build_ddl_sharding_key(space, sharding_key, ddl_sharding_key, opts.bucket_id)
if sharding_key == nil then
return nil, err
end
end

local bucket_id = sharding.key_get_bucket_id(sharding_key, opts.bucket_id)
local call_opts = {
mode = opts.mode or 'read',
prefer_replica = opts.prefer_replica,
Expand Down
19 changes: 16 additions & 3 deletions crud/select/plan.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ local errors = require('errors')
local compare_conditions = require('crud.compare.conditions')
local utils = require('crud.common.utils')
local dev_checks = require('crud.common.dev_checks')
local sharding = require('crud.common.sharding')

local compat = require('crud.common.compat')
local has_keydef = compat.exists('tuple.keydef', 'key_def')
Expand Down Expand Up @@ -48,7 +49,7 @@ local function get_index_for_condition(space_indexes, space_format, condition)
end
end

local function extract_sharding_key_from_scan_value(scan_value, scan_index, sharding_index)
local function extract_sharding_key_from_scan_value(scan_value, scan_index, sharding_index, space_format, space_name)
if #scan_value < #sharding_index.parts then
return nil
end
Expand All @@ -64,6 +65,11 @@ local function extract_sharding_key_from_scan_value(scan_value, scan_index, shar

-- check that sharding key is included in the scan index fields
local sharding_key = {}
local sharding_key_fieldno_map
local ddl_sharding_key = sharding.get_ddl_sharding_key(space_name)
if ddl_sharding_key ~= nil then
sharding_key_fieldno_map = utils.get_keys_fieldno_map(space_format, ddl_sharding_key)
end
for _, sharding_key_part in ipairs(sharding_index.parts) do
local fieldno = sharding_key_part.fieldno

Expand All @@ -79,7 +85,13 @@ local function extract_sharding_key_from_scan_value(scan_value, scan_index, shar
return nil
end

table.insert(sharding_key, field_value)
-- check if a field is a part of DDL sharding key
if ddl_sharding_key ~= nil and
sharding_key_fieldno_map[fieldno] == true then
table.insert(sharding_key, field_value)
else
table.insert(sharding_key, field_value)
end
end

return sharding_key
Expand Down Expand Up @@ -231,7 +243,8 @@ function select_plan.new(space, conditions, opts)
-- get sharding key value
local sharding_key
if scan_value ~= nil and (scan_iter == box.index.EQ or scan_iter == box.index.REQ) then
sharding_key = extract_sharding_key_from_scan_value(scan_value, scan_index, sharding_index)
sharding_key = extract_sharding_key_from_scan_value(scan_value, scan_index, sharding_index,
space_format, space_name)
end

if sharding_key ~= nil and opts.force_map_call ~= true then
Expand Down
11 changes: 10 additions & 1 deletion crud/update.lua
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,15 @@ local function call_update_on_router(space_name, key, user_operations, opts)
key = key:totable()
end

local ddl_sharding_key = sharding.get_ddl_sharding_key(space_name)
local sharding_key = key
if ddl_sharding_key ~= nil then
sharding_key, err = sharding.build_ddl_sharding_key(space, sharding_key, ddl_sharding_key, opts.bucket_id)
if sharding_key == nil then
return nil, err
end
end

local operations = user_operations
if not utils.tarantool_supports_fieldpaths() then
operations, err = utils.convert_operations(user_operations, space_format)
Expand All @@ -91,7 +100,7 @@ local function call_update_on_router(space_name, key, user_operations, opts)
end
end

local bucket_id = sharding.key_get_bucket_id(key, opts.bucket_id)
local bucket_id = sharding.key_get_bucket_id(sharding_key, opts.bucket_id)
local call_opts = {
mode = 'write',
timeout = opts.timeout,
Expand Down
Loading

0 comments on commit 3dccab0

Please sign in to comment.