-
Notifications
You must be signed in to change notification settings - Fork 5
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
The ddl.bucket_id() function needs to know a sharding function. It is costly to obtain the function declaration / definition stored in the _ddl_sharding_func space. This cache adds sharding function cache divided into two parts: raw and processed. Raw part is used for get_schema() method. Raw cache stored as is. Processed part is used for bucket_id(). Processed sharding_func cache entry may be: * table with parsed dot notation (like {'foo', 'bar'}) * function ready to call, this offloads using of loadstring() * string with an error Cache will be rebuilded if: * _ddl_sharding_func space changed: cache sets _ddl_sharding_func:on_replace trigger * schema changed: cache checks box.internal.schema_version changes This patch does not serve hot reload techniques. This entails an on_replace trigger duplication if hot reload occurs. Hot reload support will be done in separate task: #87 Closes #82
- Loading branch information
Showing
4 changed files
with
330 additions
and
11 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,136 @@ | ||
local fiber = require('fiber') | ||
|
||
local cache = nil | ||
|
||
local CACHE_LOCK_TIMEOUT = 3 | ||
local SPACE_NAME_IDX = 1 | ||
local SHARD_FUNC_NAME_IDX = 2 | ||
local SHARD_FUNC_BODY_IDX = 3 | ||
|
||
-- Function decorator that is used to prevent cache_build() from being | ||
-- called concurrently by different fibers. | ||
local function locked(f) | ||
return function(...) | ||
local ok = cache.lock:put(true, CACHE_LOCK_TIMEOUT) | ||
|
||
if not ok then | ||
error("cache lock timeout is exceeded") | ||
end | ||
|
||
local status, err = pcall(f, ...) | ||
cache.lock:get() | ||
|
||
if not status or err ~= nil then | ||
return err | ||
end | ||
end | ||
end | ||
|
||
-- Build cache. | ||
-- | ||
-- Cache structure format: | ||
-- cache = { | ||
-- spaces = { | ||
-- 'space_name' = { | ||
-- raw = {}, -- raw sharding metadata, used for ddl.get() | ||
-- processed = {} -- table with parsed dot notation (like {'foo', 'bar'}) | ||
-- -- or a function ready to call (or a string with an error) | ||
-- } | ||
-- }, | ||
-- lock -- locking based on fiber.channel() | ||
-- schema_version -- current schema version | ||
-- } | ||
-- | ||
-- function returns nothing | ||
local cache_build = locked(function() | ||
-- clear cache | ||
cache.spaces = {} | ||
|
||
if box.space._ddl_sharding_func == nil then | ||
return | ||
end | ||
|
||
for _, tuple in box.space._ddl_sharding_func:pairs() do | ||
local space_name = tuple[SPACE_NAME_IDX] | ||
local func_name = tuple[SHARD_FUNC_NAME_IDX] | ||
local func_body = tuple[SHARD_FUNC_BODY_IDX] | ||
|
||
cache.spaces[space_name] = { | ||
raw = tuple | ||
} | ||
|
||
if func_body ~= nil then | ||
local sharding_func, err = loadstring('return ' .. func_body) | ||
if sharding_func == nil then | ||
cache.spaces[space_name].processed = | ||
string.format("Body is incorrect in sharding_func for space (%s): %s", | ||
space_name, err) | ||
else | ||
cache.spaces[space_name].processed = | ||
sharding_func() | ||
end | ||
elseif func_name ~= nil then | ||
local chunks = string.split(func_name, '.') | ||
cache.spaces[space_name].processed = chunks | ||
end | ||
end | ||
|
||
end) | ||
|
||
-- Rebuild cache if _ddl_sharding_func space changed. | ||
local function cache_set_trigger() | ||
if box.space._ddl_sharding_func == nil then | ||
return | ||
end | ||
|
||
local trigger_found = false | ||
|
||
for _, func in pairs(box.space._ddl_sharding_func:on_replace()) do | ||
if func == cache_build then | ||
trigger_found = true | ||
break | ||
end | ||
end | ||
|
||
if not trigger_found then | ||
box.space._ddl_sharding_func:on_replace(cache_build) | ||
end | ||
end | ||
|
||
-- Get data from cache. | ||
-- Returns all cached data for "space_name" or nil. | ||
local function cache_get(space_name) | ||
if space_name == nil then | ||
return nil | ||
end | ||
|
||
-- using tarantool internal API. | ||
-- this is not reliable, but it is the only way to track | ||
-- schema_version changes. Fix it if a public method appears: | ||
-- https://github.com/tarantool/tarantool/issues/6544 | ||
local schema_version = box.internal.schema_version() | ||
|
||
if not cache then | ||
cache = { | ||
lock = fiber.channel(1) | ||
} | ||
cache_build() | ||
cache_set_trigger() | ||
cache.schema_version = schema_version | ||
end | ||
|
||
-- rebuild cache if database schema changed | ||
if schema_version ~= cache.schema_version then | ||
cache_build() | ||
cache_set_trigger() | ||
cache.schema_version = schema_version | ||
end | ||
|
||
return cache.spaces[space_name] | ||
end | ||
|
||
return { | ||
internal = { | ||
get = cache_get, | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,159 @@ | ||
#!/usr/bin/env tarantool | ||
|
||
local t = require('luatest') | ||
local db = require('test.db') | ||
local ddl = require('ddl') | ||
local cache = require('ddl.cache') | ||
local helper = require('test.helper') | ||
|
||
local SPACE_NAME_IDX = 1 | ||
local SHARD_FUNC_NAME_IDX = 2 | ||
local SHARD_FUNC_BODY_IDX = 3 | ||
|
||
local primary_index = { | ||
type = 'HASH', | ||
unique = true, | ||
parts = { | ||
{path = 'string_nonnull', is_nullable = false, type = 'string'}, | ||
{path = 'unsigned_nonnull', is_nullable = false, type = 'unsigned'}, | ||
}, | ||
name = 'primary' | ||
} | ||
|
||
local bucket_id_idx = { | ||
type = 'TREE', | ||
unique = false, | ||
parts = {{path = 'bucket_id', type = 'unsigned', is_nullable = false}}, | ||
name = 'bucket_id' | ||
} | ||
|
||
local func_body_first = 'function() return 42 end' | ||
local func_body_second = 'function() return 24 end' | ||
|
||
local function rebuild_db(g) | ||
db.drop_all() | ||
|
||
g.space = { | ||
engine = 'memtx', | ||
is_local = true, | ||
temporary = false, | ||
format = table.deepcopy(helper.test_space_format()) | ||
} | ||
table.insert(g.space.format, 1, { | ||
name = 'bucket_id', type = 'unsigned', is_nullable = false | ||
}) | ||
|
||
g.space.indexes = { | ||
table.deepcopy(primary_index), | ||
table.deepcopy(bucket_id_idx) | ||
} | ||
g.space.sharding_key = {'unsigned_nonnull', 'integer_nonnull'} | ||
g.schema = { | ||
spaces = { | ||
space = g.space, | ||
} | ||
} | ||
end | ||
|
||
local g = t.group() | ||
g.before_all(db.init) | ||
g.before_each(function() | ||
rebuild_db(g) | ||
end) | ||
|
||
function g.test_cache_processed_func_body() | ||
g.schema.spaces.space.sharding_func = { | ||
body = func_body_first | ||
} | ||
local ok, err = ddl.set_schema(g.schema) | ||
t.assert_equals(err, nil) | ||
t.assert_equals(ok, true) | ||
|
||
local res = cache.internal.get('space') | ||
t.assert(res) | ||
t.assert(res.processed) | ||
res = res.processed | ||
t.assert(type(res) == 'function') | ||
t.assert_equals(res(), 42) | ||
end | ||
|
||
function g.test_cache_processed_func_name() | ||
local sharding_func_name = 'sharding_func' | ||
rawset(_G, sharding_func_name, function(key) return key end) | ||
g.schema.spaces.space.sharding_func = sharding_func_name | ||
|
||
local ok, err = ddl.set_schema(g.schema) | ||
t.assert_equals(err, nil) | ||
t.assert_equals(ok, true) | ||
|
||
local res = cache.internal.get('space') | ||
t.assert(res) | ||
t.assert(res.processed) | ||
res = res.processed | ||
t.assert(type(res) == 'table') | ||
t.assert_equals(res[1], 'sharding_func') | ||
|
||
rawset(_G, sharding_func_name, nil) | ||
end | ||
|
||
function g.test_cache_schema_changed() | ||
g.schema.spaces.space.sharding_func = { | ||
body = func_body_first | ||
} | ||
local ok, err = ddl.set_schema(g.schema) | ||
t.assert_equals(err, nil) | ||
t.assert_equals(ok, true) | ||
|
||
local res = cache.internal.get('space') | ||
t.assert(res) | ||
t.assert(res.raw) | ||
res = res.raw | ||
t.assert_equals(res[SPACE_NAME_IDX], 'space') | ||
t.assert_equals(res[SHARD_FUNC_NAME_IDX], nil) | ||
t.assert_equals(res[SHARD_FUNC_BODY_IDX], func_body_first) | ||
|
||
rebuild_db(g) | ||
|
||
g.schema.spaces.space.sharding_func = { | ||
body = func_body_second | ||
} | ||
local ok, err = ddl.set_schema(g.schema) | ||
t.assert_equals(err, nil) | ||
t.assert_equals(ok, true) | ||
|
||
local res = cache.internal.get('space') | ||
t.assert(res) | ||
t.assert(res.raw) | ||
res = res.raw | ||
t.assert_equals(res[SPACE_NAME_IDX], 'space') | ||
t.assert_equals(res[SHARD_FUNC_NAME_IDX], nil) | ||
t.assert_equals(res[SHARD_FUNC_BODY_IDX], func_body_second) | ||
end | ||
|
||
function g.test_cache_space_updated() | ||
g.schema.spaces.space.sharding_func = { | ||
body = func_body_first | ||
} | ||
local ok, err = ddl.set_schema(g.schema) | ||
t.assert_equals(err, nil) | ||
t.assert_equals(ok, true) | ||
|
||
local res = cache.internal.get('space') | ||
t.assert(res) | ||
t.assert(res.raw) | ||
res = res.raw | ||
t.assert_equals(res[SPACE_NAME_IDX], 'space') | ||
t.assert_equals(res[SHARD_FUNC_NAME_IDX], nil) | ||
t.assert_equals(res[SHARD_FUNC_BODY_IDX], func_body_first) | ||
|
||
box.space._ddl_sharding_func | ||
:update({'space'}, {{'=', SHARD_FUNC_BODY_IDX, func_body_second}}) | ||
|
||
local res = cache.internal.get('space') | ||
t.assert(res) | ||
t.assert(res.raw) | ||
res = res.raw | ||
t.assert_equals(res[SPACE_NAME_IDX], 'space') | ||
t.assert_equals(res[SHARD_FUNC_NAME_IDX], nil) | ||
t.assert_equals(res[SHARD_FUNC_BODY_IDX], func_body_second) | ||
end |