Skip to content

Commit

Permalink
get sharding keys from a space
Browse files Browse the repository at this point in the history
  • Loading branch information
ligurio committed Jul 30, 2021
1 parent 0b9e6b7 commit 9a8dab1
Show file tree
Hide file tree
Showing 6 changed files with 22 additions and 63 deletions.
35 changes: 17 additions & 18 deletions crud/common/sharding.lua
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ local ShardingKeyError = errors.new_class("ShardingKeyError", {capture_stack = f
local dev_checks = require('crud.common.dev_checks')
local utils = require('crud.common.utils')

-- TODO: invalidate ddl_schema_cache
local ddl_schema_cache = nil
local ddl_sharding_keys_cache = {}
local sharding_key_in_primary_index = {}

local sharding = {}
Expand All @@ -32,7 +31,7 @@ function sharding.tuple_get_bucket_id(tuple, space, specified_bucket_id)

local primary_index = space.index[0]
local key
local sharding_key = sharding.get_ddl_sharding_key(space.name)
local sharding_key = sharding.get_ddl_sharding_key(space.name, vshard.router.routeall())
if sharding_key ~= nil then
local sharding_keys_fieldnos = utils.get_keys_fieldnos(space:format(), sharding_key)
if sharding_keys_fieldnos ~= nil then
Expand Down Expand Up @@ -74,23 +73,23 @@ function sharding.tuple_set_and_return_bucket_id(tuple, space, specified_bucket_
return bucket_id
end

-- Get sharding key (actually field names) using DDL module.
function sharding.get_ddl_sharding_key(space_name)
dev_checks('string')
if ddl == false then
return nil
end
if ddl_schema_cache == nil then
ddl_schema_cache = ddl.get_schema()
end
assert(type(ddl_schema_cache.spaces), 'table')
local space_schema = ddl_schema_cache.spaces[space_name]
if space_schema == nil or
space_schema.sharding_key == nil then
return nil
-- Get sharding key (actually field names).
-- Returns a table with field names or nil.
function sharding.get_ddl_sharding_key(space_name, replicasets)
dev_checks('string', 'table')

if ddl_sharding_keys_cache[space_name] == nil then
local replicaset = select(2, next(replicasets))
local conn_master = replicaset.master.conn
local sharding_keys = conn_master.space._ddl_sharding_key
if sharding_keys == nil then
return nil
end
local record = conn_master.space._ddl_sharding_key:get{space_name}
ddl_sharding_keys_cache[space_name] = record and record.sharding_keys
end

return space_schema.sharding_key
return ddl_sharding_keys_cache[space_name]
end

-- Make sure fields used in sharding key are present in primary index.
Expand Down
2 changes: 1 addition & 1 deletion crud/delete.lua
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ local function call_delete_on_router(space_name, key, opts)
key = key:totable()
end

local ddl_sharding_key = sharding.get_ddl_sharding_key(space_name)
local ddl_sharding_key = sharding.get_ddl_sharding_key(space_name, vshard.router.routeall())
local sharding_key = key
if ddl_sharding_key ~= nil then
local err
Expand Down
2 changes: 1 addition & 1 deletion crud/get.lua
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ local function call_get_on_router(space_name, key, opts)
key = key:totable()
end

local ddl_sharding_key = sharding.get_ddl_sharding_key(space_name)
local ddl_sharding_key = sharding.get_ddl_sharding_key(space_name, vshard.router.routeall())
local sharding_key = key
if ddl_sharding_key ~= nil then
local err
Expand Down
3 changes: 2 additions & 1 deletion crud/select/plan.lua
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ local compare_conditions = require('crud.compare.conditions')
local utils = require('crud.common.utils')
local dev_checks = require('crud.common.dev_checks')
local sharding = require('crud.common.sharding')
local vshard = require('vshard')

local compat = require('crud.common.compat')
local has_keydef = compat.exists('tuple.keydef', 'key_def')
Expand Down Expand Up @@ -66,7 +67,7 @@ local function extract_sharding_key_from_scan_value(scan_value, scan_index, shar
-- check that sharding key is included in the scan index fields
local sharding_key = {}
local sharding_key_fieldno_map
local ddl_sharding_key = sharding.get_ddl_sharding_key(space_name)
local ddl_sharding_key = sharding.get_ddl_sharding_key(space_name, vshard.router.routeall())
if ddl_sharding_key ~= nil then
sharding_key_fieldno_map = utils.get_keys_fieldno_map(space_format, ddl_sharding_key)
end
Expand Down
2 changes: 1 addition & 1 deletion crud/update.lua
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ local function call_update_on_router(space_name, key, user_operations, opts)
key = key:totable()
end

local ddl_sharding_key = sharding.get_ddl_sharding_key(space_name)
local ddl_sharding_key = sharding.get_ddl_sharding_key(space_name, vshard.router.routeall())
local sharding_key = key
if ddl_sharding_key ~= nil then
sharding_key, err = sharding.build_ddl_sharding_key(space, sharding_key, ddl_sharding_key, opts.bucket_id)
Expand Down
41 changes: 0 additions & 41 deletions test/integration/ddl_sharding_key_test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,6 @@ if not ok then
t.skip('Lua module ddl is required to run test')
end

local function srv_call(srv, fn_name, ...)
return srv.net_box:eval([[
local fn_name, args = ...
local cartridge = require('cartridge')
local ddl_manager = cartridge.service_get('ddl-manager')
return ddl_manager[fn_name](unpack(args))
]], {fn_name, {...}})
end

local pgroup = helpers.pgroup.new('ddl_sharding_key', {
engine = {'memtx', 'vinyl'},
})
Expand All @@ -42,38 +33,6 @@ pgroup:set_before_all(function(g)
t.assert_equals(type(result), 'table')
t.assert_equals(err, nil)

local schema = {
spaces = {
customers = {
engine = g.params.engine,
is_local = true,
temporary = false,
format = {
{name = 'id', is_nullable = false, type = 'unsigned'},
{name = 'bucket_id', is_nullable = false, type = 'unsigned'},
{name = 'name', is_nullable = false, type = 'string'},
{name = 'age', is_nullable = false, type = 'number'},
},
indexes = {{
name = 'id',
type = 'TREE',
unique = true,
parts = {
{path = 'id', is_nullable = false, type = 'unsigned'},
},
}, {
name = 'bucket_id',
type = 'TREE',
unique = false,
parts = {
{path = 'bucket_id', is_nullable = false, type = 'unsigned'},
}
}},
sharding_key = {'id', 'name'},
},
}
}
srv_call(g.cluster.main_server, 'set_clusterwide_schema_lua', schema)
g.space_format = g.cluster.servers[2].net_box.space.customers:format()
end)

Expand Down

0 comments on commit 9a8dab1

Please sign in to comment.