-
Notifications
You must be signed in to change notification settings - Fork 5
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
The ddl.bucket_id() function needs to know a sharding function. It is costly to obtain the function declaration / definition stored in the _ddl_sharding_func space. This cache adds sharding function cache divided into two parts: raw and processed. Raw part is used for get_space_schema() method. Raw cache stored as is. Processed part is used for bucket_id(). Closes #82
- Loading branch information
Showing
4 changed files
with
322 additions
and
12 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,108 @@ | ||
local fiber = require('fiber') | ||
|
||
local cache = nil | ||
|
||
local CACHE_LOCK_TIMEOUT = 3 | ||
local SPACE_NAME_IDX = 1 | ||
local SHARD_FUNC_NAME_IDX = 2 | ||
local SHARD_FUNC_BODY_IDX = 3 | ||
|
||
-- Build cache and setup "on_replace" trigger. | ||
-- | ||
-- Cache structure format: | ||
-- cache = { | ||
-- spaces = { | ||
-- 'space_name' = { | ||
-- raw = {}, -- raw sharding metadata, used for ddl.get() | ||
-- processed = {} -- sharding function that is ready to call or a string with error | ||
-- } | ||
-- }, | ||
-- fetch_lock -- locking based on fiber.channel() | ||
-- } | ||
local function cache_build() | ||
-- prevent cache_build() from being called concurrently by different fibers | ||
local unlocked = cache.fetch_lock:put(true, CACHE_LOCK_TIMEOUT) | ||
local trigger_found = false | ||
|
||
if not unlocked then | ||
return | ||
end | ||
|
||
-- clear cache | ||
cache.spaces = {} | ||
|
||
if box.space._ddl_sharding_func == nil then | ||
cache.fetch_lock:get() | ||
return nil | ||
end | ||
|
||
for _, tuple in box.space._ddl_sharding_func:pairs() do | ||
local space_name = tuple[SPACE_NAME_IDX] | ||
local func_name = tuple[SHARD_FUNC_NAME_IDX] | ||
local func_body = tuple[SHARD_FUNC_BODY_IDX] | ||
|
||
cache.spaces[space_name] = {} | ||
cache.spaces[space_name].sharding_func = { raw = tuple } | ||
|
||
if func_body ~= nil then | ||
local sharding_func, err = loadstring('return ' .. func_body) | ||
if sharding_func == nil then | ||
cache.spaces[space_name].sharding_func.processed = | ||
string.format("Body is incorrect in sharding_func for space (%s): %s", | ||
space_name, err) | ||
else | ||
cache.spaces[space_name].sharding_func.processed = | ||
sharding_func() | ||
end | ||
elseif func_name ~= nil then | ||
local chunks = string.split(func_name, '.') | ||
cache.spaces[space_name].sharding_func.processed = chunks | ||
end | ||
end | ||
|
||
cache._schema_version = box.internal.schema_version() | ||
|
||
for _, func in pairs(box.space._ddl_sharding_func:on_replace()) do | ||
if func == cache_build then | ||
trigger_found = true | ||
break | ||
end | ||
end | ||
|
||
if not trigger_found then | ||
box.space._ddl_sharding_func:on_replace(cache_build) | ||
end | ||
|
||
cache.fetch_lock:get() | ||
end | ||
|
||
-- Get data from cache. | ||
-- Returns all cached data for "space_name". | ||
local function cache_get(space_name) | ||
if space_name == nil then | ||
return nil | ||
end | ||
|
||
local schema_version = box.internal.schema_version() | ||
|
||
if not cache then | ||
cache = { | ||
spaces = {}, | ||
fetch_lock = fiber.channel(1) | ||
} | ||
cache_build() | ||
end | ||
|
||
-- rebuild cache if database schema changed | ||
if schema_version ~= cache._schema_version then | ||
cache_build() | ||
end | ||
|
||
return cache.spaces[space_name] | ||
end | ||
|
||
return { | ||
internal = { | ||
get = cache_get, | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,179 @@ | ||
#!/usr/bin/env tarantool | ||
|
||
local t = require('luatest') | ||
local db = require('test.db') | ||
local ddl = require('ddl') | ||
local cache = require('ddl.cache') | ||
|
||
local SPACE_NAME_IDX = 1 | ||
local SHARD_FUNC_NAME_IDX = 2 | ||
local SHARD_FUNC_BODY_IDX = 3 | ||
|
||
local test_space = { | ||
engine = 'memtx', | ||
is_local = true, | ||
temporary = false, | ||
format = { | ||
{name = 'unsigned_nonnull', type = 'unsigned', is_nullable = false}, | ||
{name = 'unsigned_nullable', type = 'unsigned', is_nullable = true}, | ||
{name = 'integer_nonnull', type = 'integer', is_nullable = false}, | ||
{name = 'integer_nullable', type = 'integer', is_nullable = true}, | ||
{name = 'number_nonnull', type = 'number', is_nullable = false}, | ||
{name = 'number_nullable', type = 'number', is_nullable = true}, | ||
{name = 'boolean_nonnull', type = 'boolean', is_nullable = false}, | ||
{name = 'boolean_nullable', type = 'boolean', is_nullable = true}, | ||
{name = 'string_nonnull', type = 'string', is_nullable = false}, | ||
{name = 'string_nullable', type = 'string', is_nullable = true}, | ||
{name = 'scalar_nonnull', type = 'scalar', is_nullable = false}, | ||
{name = 'scalar_nullable', type = 'scalar', is_nullable = true}, | ||
{name = 'array_nonnull', type = 'array', is_nullable = false}, | ||
{name = 'array_nullable', type = 'array', is_nullable = true}, | ||
{name = 'map_nonnull', type = 'map', is_nullable = false}, | ||
{name = 'map_nullable', type = 'map', is_nullable = true}, | ||
{name = 'any_nonnull', type = 'any', is_nullable = false}, | ||
{name = 'any_nullable', type = 'any', is_nullable = true}, | ||
}, | ||
} | ||
|
||
local primary_index = { | ||
type = 'HASH', | ||
unique = true, | ||
parts = { | ||
{path = 'string_nonnull', is_nullable = false, type = 'string'}, | ||
{path = 'unsigned_nonnull', is_nullable = false, type = 'unsigned'}, | ||
}, | ||
name = 'primary' | ||
} | ||
|
||
local bucket_id_idx = { | ||
type = 'TREE', | ||
unique = false, | ||
parts = {{path = 'bucket_id', type = 'unsigned', is_nullable = false}}, | ||
name = 'bucket_id' | ||
} | ||
|
||
local func_body_first = 'function() return 42 end' | ||
local func_body_second = 'function() return 24 end' | ||
|
||
local function rebuild_db(g) | ||
db.drop_all() | ||
|
||
g.space = table.deepcopy(test_space) | ||
table.insert(g.space.format, 1, { | ||
name = 'bucket_id', type = 'unsigned', is_nullable = false | ||
}) | ||
|
||
g.space.indexes = { | ||
table.deepcopy(primary_index), | ||
table.deepcopy(bucket_id_idx) | ||
} | ||
g.space.sharding_key = {'unsigned_nonnull', 'integer_nonnull'} | ||
g.schema = { | ||
spaces = { | ||
space = g.space, | ||
} | ||
} | ||
end | ||
|
||
local g = t.group() | ||
g.before_all(db.init) | ||
g.before_each(function() | ||
rebuild_db(g) | ||
end) | ||
|
||
function g.test_cache_processed_func_body() | ||
g.schema.spaces.space.sharding_func = { | ||
body = func_body_first | ||
} | ||
local ok, err = ddl.set_schema(g.schema) | ||
t.assert_equals(err, nil) | ||
t.assert_equals(ok, true) | ||
|
||
local res = cache.internal.get('space') | ||
t.assert(res) | ||
t.assert(res.sharding_func.processed) | ||
res = res.sharding_func.processed | ||
t.assert(type(res) == 'function') | ||
t.assert_equals(res(), 42) | ||
end | ||
|
||
function g.test_cache_processed_func_name() | ||
local sharding_func_name = 'sharding_func' | ||
rawset(_G, sharding_func_name, function(key) return key end) | ||
g.schema.spaces.space.sharding_func = sharding_func_name | ||
|
||
local ok, err = ddl.set_schema(g.schema) | ||
t.assert_equals(err, nil) | ||
t.assert_equals(ok, true) | ||
|
||
local res = cache.internal.get('space') | ||
t.assert(res) | ||
t.assert(res.sharding_func.processed) | ||
res = res.sharding_func.processed | ||
t.assert(type(res) == 'table') | ||
t.assert_equals(res[1], 'sharding_func') | ||
|
||
rawset(_G, sharding_func_name, nil) | ||
end | ||
|
||
function g.test_cache_schema_changed() | ||
g.schema.spaces.space.sharding_func = { | ||
body = func_body_first | ||
} | ||
local ok, err = ddl.set_schema(g.schema) | ||
t.assert_equals(err, nil) | ||
t.assert_equals(ok, true) | ||
|
||
local res = cache.internal.get('space') | ||
t.assert(res) | ||
t.assert(res.sharding_func.raw) | ||
res = res.sharding_func.raw | ||
t.assert_equals(res[SPACE_NAME_IDX], 'space') | ||
t.assert_equals(res[SHARD_FUNC_NAME_IDX], nil) | ||
t.assert_equals(res[SHARD_FUNC_BODY_IDX], func_body_first) | ||
|
||
rebuild_db(g) | ||
|
||
g.schema.spaces.space.sharding_func = { | ||
body = func_body_second | ||
} | ||
local ok, err = ddl.set_schema(g.schema) | ||
t.assert_equals(err, nil) | ||
t.assert_equals(ok, true) | ||
|
||
local res = cache.internal.get('space') | ||
t.assert(res) | ||
t.assert(res.sharding_func.raw) | ||
res = res.sharding_func.raw | ||
t.assert_equals(res[SPACE_NAME_IDX], 'space') | ||
t.assert_equals(res[SHARD_FUNC_NAME_IDX], nil) | ||
t.assert_equals(res[SHARD_FUNC_BODY_IDX], func_body_second) | ||
end | ||
|
||
function g.test_cache_space_updated() | ||
g.schema.spaces.space.sharding_func = { | ||
body = func_body_first | ||
} | ||
local ok, err = ddl.set_schema(g.schema) | ||
t.assert_equals(err, nil) | ||
t.assert_equals(ok, true) | ||
|
||
local res = cache.internal.get('space') | ||
t.assert(res) | ||
t.assert(res.sharding_func.raw) | ||
res = res.sharding_func.raw | ||
t.assert_equals(res[SPACE_NAME_IDX], 'space') | ||
t.assert_equals(res[SHARD_FUNC_NAME_IDX], nil) | ||
t.assert_equals(res[SHARD_FUNC_BODY_IDX], func_body_first) | ||
|
||
box.space._ddl_sharding_func | ||
:update({'space'}, {{'=', SHARD_FUNC_BODY_IDX, func_body_second}}) | ||
|
||
local res = cache.internal.get('space') | ||
t.assert(res) | ||
t.assert(res.sharding_func.raw) | ||
res = res.sharding_func.raw | ||
t.assert_equals(res[SPACE_NAME_IDX], 'space') | ||
t.assert_equals(res[SHARD_FUNC_NAME_IDX], nil) | ||
t.assert_equals(res[SHARD_FUNC_BODY_IDX], func_body_second) | ||
end |