Skip to content

Commit

Permalink
Use sharding keys to calculate bucket id (WIP)
Browse files Browse the repository at this point in the history
CRUD allows to automatically calculate `bucket_id` based on primary key
or one can specify `bucket_id` explicitly [1]. However it is often
required to calculate `bucket_id` using sharding keys created by DDL schema.

DDL module exposes space with sharding keys as a part of public API [2],
so everyone is allowed to set and get sharding keys there without adding
DDL module to dependencies.

Patch allows to calculate `bucket_id` value automatically when sharding keys
used in a sharded space.

1. #46
2. https://github.com/tarantool/ddl#api
3. #46 (comment)

Closes #166
  • Loading branch information
ligurio committed Jul 6, 2021
1 parent 275e312 commit e2331a6
Show file tree
Hide file tree
Showing 10 changed files with 297 additions and 10 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

### Added

* Support calculating `bucket_id` based on `ddl.sharding_key`.
* Added jsonpath indexes support for queries
* `tuple-merger` module updated to 0.0.2

Expand Down
3 changes: 2 additions & 1 deletion crud/delete.lua
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ end
--
-- @tparam ?number opts.bucket_id
-- Bucket ID
-- (by default, it's vshard.router.bucket_id_strcrc32 of primary key)
-- (by default, it's vshard.router.bucket_id_strcrc32 of primary key
-- or secondary key)
--
-- @return[1] object
-- @treturn[2] nil
Expand Down
3 changes: 2 additions & 1 deletion crud/get.lua
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ end
--
-- @tparam ?number opts.bucket_id
-- Bucket ID
-- (by default, it's vshard.router.bucket_id_strcrc32 of primary key)
-- (by default, it's vshard.router.bucket_id_strcrc32 of primary
-- or secondary key)
--
-- @tparam ?boolean opts.prefer_replica
-- Call on replica if it's possible
Expand Down
3 changes: 2 additions & 1 deletion crud/insert.lua
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ end
--
-- @tparam ?number opts.bucket_id
-- Bucket ID
-- (by default, it's vshard.router.bucket_id_strcrc32 of primary key)
-- (by default, it's vshard.router.bucket_id_strcrc32 of primary
-- or secondary key)
--
-- @return[1] tuple
-- @treturn[2] nil
Expand Down
3 changes: 2 additions & 1 deletion crud/replace.lua
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ end
--
-- @tparam ?number opts.bucket_id
-- Bucket ID
-- (by default, it's vshard.router.bucket_id_strcrc32 of primary key)
-- (by default, it's vshard.router.bucket_id_strcrc32 of primary or
-- secondary key)
--
-- @return[1] object
-- @treturn[2] nil
Expand Down
23 changes: 19 additions & 4 deletions crud/select/plan.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ local errors = require('errors')
local compare_conditions = require('crud.compare.conditions')
local utils = require('crud.common.utils')
local dev_checks = require('crud.common.dev_checks')
-- ddl module is optional
local _, ddl = pcall(require, 'ddl')

local compat = require('crud.common.compat')
local has_keydef = compat.exists('tuple.keydef', 'key_def')
Expand All @@ -16,6 +18,7 @@ local select_plan = {}

local IndexTypeError = errors.new_class('IndexTypeError', {capture_stack = false})
local FilterFieldsError = errors.new_class('FilterFieldsError', {capture_stack = false})
local DDLError = errors.new_class('DDLError', {capture_stack = false})

local function index_is_allowed(index)
return index.type == 'TREE'
Expand Down Expand Up @@ -226,12 +229,24 @@ function select_plan.new(space, conditions, opts)
end
end

local sharding_index = primary_index -- XXX: only sharding by primary key is supported
local sharding_index = primary_index

-- get sharding key value
-- getting sharding_key specified in DDL schema
local sharding_key
if scan_value ~= nil and (scan_iter == box.index.EQ or scan_iter == box.index.REQ) then
sharding_key = extract_sharding_key_from_scan_value(scan_value, scan_index, sharding_index)
if ddl ~= nil then
local schema, err = ddl.get_schema(space_name)
if schema == nil then
return nil, DDLError:new('Failed to obtain a DDL schema')
end
sharding_key = schema.sharding_key
end

-- getting sharding_key used in primary_index
if sharding_key == nil then
local sharding_index = primary_index
if scan_value ~= nil and (scan_iter == box.index.EQ or scan_iter == box.index.REQ) then
sharding_key = extract_sharding_key_from_scan_value(scan_value, scan_index, sharding_index)
end
end

if sharding_key ~= nil and opts.force_map_call ~= true then
Expand Down
3 changes: 2 additions & 1 deletion crud/update.lua
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ end
--
-- @tparam ?number opts.bucket_id
-- Bucket ID
-- (by default, it's vshard.router.bucket_id_strcrc32 of primary key)
-- (by default, it's vshard.router.bucket_id_strcrc32 of primary or
-- secondary key)
--
-- @return[1] object
-- @treturn[2] nil
Expand Down
3 changes: 2 additions & 1 deletion crud/upsert.lua
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ end
--
-- @tparam ?number opts.bucket_id
-- Bucket ID
-- (by default, it's vshard.router.bucket_id_strcrc32 of primary key)
-- (by default, it's vshard.router.bucket_id_strcrc32 of primary or
-- secondary key)
--
-- @return[1] tuple
-- @treturn[2] nil
Expand Down
80 changes: 80 additions & 0 deletions test/entrypoint/srv_ddl.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
#!/usr/bin/env tarantool

require('strict').on()
_G.is_initialized = function() return false end

local log = require('log')
local errors = require('errors')
local cartridge = require('cartridge')
local ddl = require('ddl')

package.preload['customers-storage'] = function()
return {
role_name = 'customers-storage',
init = function()
local engine = os.getenv('ENGINE') or 'memtx'
local schema_customers = {
is_local = true,
engine = engine,
temporary = false,
sharding_key = {
'name',
'age',
},
format = {
{name = 'id', type = 'unsigned', is_nullable = false},
{name = 'bucket_id', type = 'unsigned', is_nullable = false},
{name = 'name', type = 'string', is_nullable = false},
{name = 'age', type = 'number', is_nullable = false},
},
indexes = {
{
name = 'primary',
type = 'TREE',
unique = true,
parts = {
{path = 'id', type = 'unsigned', is_nullable = false},
},
},
{
name = 'bucket_id',
type = 'TREE',
unique = false,
parts = {
{path = 'bucket_id', type = 'unsigned', is_nullable = false},
},
},
}
}
local schema = {
spaces = {
['customers'] = schema_customers,
},
}
if not box.cfg.read_only then
local ok, err = ddl.set_schema(schema)
if not ok then
error(err)
end
end
end,
}
end

local ok, err = errors.pcall('CartridgeCfgError', cartridge.cfg, {
advertise_uri = 'localhost:3301',
http_port = 8081,
bucket_count = 3000,
roles = {
'customers-storage',
'cartridge.roles.crud-router',
'cartridge.roles.crud-storage',
},
})

if not ok then
log.error('%s', err)
os.exit(1)
end

_G.is_initialized = cartridge.is_healthy
185 changes: 185 additions & 0 deletions test/integration/ddl_sharding_key_test.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
#!/usr/bin/env tarantool

local fio = require('fio')
local t = require('luatest')
local crud = require('crud')

local helpers = require('test.helper')

local ok, ddl = pcall(require, 'ddl')
if not ok then
t.skip('Please, install ddl rock to run tests')
end

local pgroup = helpers.pgroup.new('ddl_sharding_key', {
engine = {'memtx', 'vinyl'},
})

pgroup:set_before_all(function(g)
g.cluster = helpers.Cluster:new({
datadir = fio.tempdir(),
server_command = helpers.entrypoint('srv_ddl'),
use_vshard = true,
replicasets = helpers.get_test_replicasets(),
env = {
['ENGINE'] = g.params.engine,
},
})
g.cluster:start()
local result, err = g.cluster.main_server.net_box:eval([[
local ddl = require('ddl')
local ok, err = ddl.get_schema()
return ok, err
]])
t.assert_equals(type(result), 'table')
t.assert_equals(err, nil)
end)

pgroup:set_after_all(function(g) helpers.stop_cluster(g.cluster) end)

pgroup:set_before_each(function(g)
helpers.truncate_space_on_cluster(g.cluster, 'customers')
end)

pgroup:add('test_key', function(g)
local result, err = g.cluster.main_server.net_box:call('crud.insert_object',
{'customers', {id = 33, name = 'Mayakovsky', age = 36}})
t.assert_equals(err, nil)
end)

pgroup:add('test_select', function(g)
t.skip('not implemented')
end)

pgroup:add('test_insert_get', function(g)
-- insert
local result, err = g.cluster.main_server.net_box:call('crud.insert',
{'customers', {2, box.NULL, 'Ivan', 20}})

t.assert_equals(err, nil)
t.assert_equals(result.metadata, {
{name = 'id', type = 'unsigned', is_nullable = false},
{name = 'bucket_id', type = 'unsigned', is_nullable = false},
{name = 'name', type = 'string', is_nullable = false},
{name = 'age', type = 'number', is_nullable = false},
})
t.assert_equals(result.rows, {{2, 401, 'Ivan', 20}})

-- get
local result, err = g.cluster.main_server.net_box:call('crud.get', {'customers', 2})

t.assert_equals(err, nil)
t.assert(result ~= nil)
t.assert_equals(result.rows, {{2, 401, 'Ivan', 20}})
end)

pgroup:add('test_update', function(g)
-- insert tuple
local result, err = g.cluster.main_server.net_box:call(
'crud.insert_object', {'customers', {id = 22, name = 'Leo', age = 72}})

t.assert_equals(err, nil)
t.assert_equals(result.metadata, {
{name = 'id', type = 'unsigned', is_nullable = false},
{name = 'bucket_id', type = 'unsigned', is_nullable = false},
{name = 'name', type = 'string', is_nullable = false},
{name = 'age', type = 'number', is_nullable = false},
})
local objects = crud.unflatten_rows(result.rows, result.metadata)
t.assert_equals(objects, {{id = 22, name = 'Leo', age = 72, bucket_id = 655}})

-- update age and name fields
local result, err = g.cluster.main_server.net_box:call('crud.update', {'customers', 22, {
{'+', 'age', 10},
{'=', 'name', 'Leo Tolstoy'},
}})

t.assert_equals(err, nil)
local objects = crud.unflatten_rows(result.rows, result.metadata)
t.assert_equals(objects, {{id = 22, name = 'Leo Tolstoy', age = 82, bucket_id = 655}})

-- get
local result, err = g.cluster.main_server.net_box:call('crud.get', {'customers', 22})

t.assert_equals(err, nil)
local objects = crud.unflatten_rows(result.rows, result.metadata)
t.assert_equals(objects, {{id = 22, name = 'Leo Tolstoy', age = 82, bucket_id = 655}})
end)

pgroup:add('test_delete', function(g)
-- insert tuple
local result, err = g.cluster.main_server.net_box:call(
'crud.insert_object', {'customers', {id = 33, name = 'Mayakovsky', age = 36}})

t.assert_equals(err, nil)
t.assert_equals(result.metadata, {
{name = 'id', type = 'unsigned', is_nullable = false},
{name = 'bucket_id', type = 'unsigned', is_nullable = false},
{name = 'name', type = 'string', is_nullable = false},
{name = 'age', type = 'number', is_nullable = false},
})
local objects = crud.unflatten_rows(result.rows, result.metadata)
t.assert_equals(objects, {{id = 33, name = 'Mayakovsky', age = 36, bucket_id = 907}})

-- delete
local result, err = g.cluster.main_server.net_box:call('crud.delete', {'customers', 33})

t.assert_equals(err, nil)
if g.params.engine == 'memtx' then
local objects = crud.unflatten_rows(result.rows, result.metadata)
t.assert_equals(objects, {{id = 33, name = 'Mayakovsky', age = 36, bucket_id = 907}})
else
t.assert_equals(#result.rows, 0)
end

-- get
local result, err = g.cluster.main_server.net_box:call('crud.get', {'customers', 33})

t.assert_equals(err, nil)
t.assert_equals(#result.rows, 0)
end)

pgroup:add('test_replace', function(g)
local result, err = g.cluster.main_server.net_box:call(
'crud.replace', {'customers', {45, box.NULL, 'John Fedor', 99}})

t.assert_equals(err, nil)
t.assert_equals(result.metadata, {
{name = 'id', type = 'unsigned', is_nullable = false},
{name = 'bucket_id', type = 'unsigned', is_nullable = false},
{name = 'name', type = 'string', is_nullable = false},
{name = 'age', type = 'number', is_nullable = false},
})
t.assert_equals(result.rows, {{45, 392, 'John Fedor', 99}})

-- replace again
local result, err = g.cluster.main_server.net_box:call(
'crud.replace', {'customers', {45, box.NULL, 'John Fedor', 100}})

t.assert_equals(err, nil)
t.assert_equals(result.rows, {{45, 392, 'John Fedor', 100}})
end)

pgroup:add('test_upsert', function(g)
-- upsert tuple first time
local result, err = g.cluster.main_server.net_box:call('crud.upsert',
{'customers', {67, box.NULL, 'Saltykov-Shchedrin', 63}, {
{'=', 'name', 'Mikhail Saltykov-Shchedrin'},
}})

t.assert_equals(#result.rows, 0)
t.assert_equals(result.metadata, {
{name = 'id', type = 'unsigned', is_nullable = false},
{name = 'bucket_id', type = 'unsigned', is_nullable = false},
{name = 'name', type = 'string', is_nullable = false},
{name = 'age', type = 'number', is_nullable = false},
})
t.assert_equals(err, nil)

-- get
local result, err = g.cluster.main_server.net_box:call('crud.get', {'customers', 67})

t.assert_equals(err, nil)
t.assert_equals(result.rows, {{67, 1143, 'Saltykov-Shchedrin', 63}})
end)

0 comments on commit e2331a6

Please sign in to comment.