Skip to content

Commit

Permalink
Add caching of sharding function
Browse files Browse the repository at this point in the history
The ddl.bucket_id() function needs to know a sharding function.
It is costly to obtain the function declaration /
definition stored in the _ddl_sharding_func space.

This cache adds sharding function cache divided into two parts:
raw and processed. Raw part is used for get_schema() method.
Raw cache stored as is. Processed part is used for bucket_id().

Processed sharding_func cache entry may be:
* table with parsed dot notation (like {'foo', 'bar'})
* function ready to call, this offloads using of loadstring()
* string with an error

Cache will be rebuilded if:
* _ddl_sharding_func space changed: cache sets _ddl_sharding_func:on_replace
  trigger
* schema changed: cache checks box.internal.schema_version changes

This patch does not serve hot reload techniques.
This entails an on_replace trigger duplication if hot reload occurs.
Hot reload support will be done in separate task:
#87

Closes #82
  • Loading branch information
0x501D committed Dec 30, 2021
1 parent 4f0fbd1 commit 878ee46
Show file tree
Hide file tree
Showing 4 changed files with 322 additions and 11 deletions.
136 changes: 136 additions & 0 deletions ddl/cache.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
local fiber = require('fiber')

local cache = nil

local CACHE_LOCK_TIMEOUT = 3
local SPACE_NAME_IDX = 1
local SHARD_FUNC_NAME_IDX = 2
local SHARD_FUNC_BODY_IDX = 3

-- Function decorator that is used to prevent cache_build() from being
-- called concurrently by different fibers.
local function locked(f)
return function(...)
local ok = cache.lock:put(true, CACHE_LOCK_TIMEOUT)

if not ok then
error("cache lock timeout is exceeded")
end

local status, err = pcall(f, ...)
cache.lock:get()

if not status or err ~= nil then
return err
end
end
end

-- Build cache.
--
-- Cache structure format:
-- cache = {
-- spaces = {
-- 'space_name' = {
-- raw = {}, -- raw sharding metadata, used for ddl.get()
-- processed = {}, -- table with parsed dot notation (like {'foo', 'bar'})
-- -- or a function ready to call (or a string with an error)
-- }
-- },
-- lock, -- locking based on fiber.channel()
-- schema_version, -- current schema version
-- }
--
-- function returns nothing
local cache_build = locked(function()
-- clear cache
cache.spaces = {}

if box.space._ddl_sharding_func == nil then
return
end

for _, tuple in box.space._ddl_sharding_func:pairs() do
local space_name = tuple[SPACE_NAME_IDX]
local func_name = tuple[SHARD_FUNC_NAME_IDX]
local func_body = tuple[SHARD_FUNC_BODY_IDX]

cache.spaces[space_name] = {
raw = tuple
}

if func_body ~= nil then
local sharding_func, err = loadstring('return ' .. func_body)
if sharding_func == nil then
cache.spaces[space_name].processed =
string.format("Body is incorrect in sharding_func for space (%s): %s",
space_name, err)
else
cache.spaces[space_name].processed =
sharding_func()
end
elseif func_name ~= nil then
local chunks = string.split(func_name, '.')
cache.spaces[space_name].processed = chunks
end
end

end)

-- Rebuild cache if _ddl_sharding_func space changed.
local function cache_set_trigger()
if box.space._ddl_sharding_func == nil then
return
end

local trigger_found = false

for _, func in pairs(box.space._ddl_sharding_func:on_replace()) do
if func == cache_build then
trigger_found = true
break
end
end

if not trigger_found then
box.space._ddl_sharding_func:on_replace(cache_build)
end
end

-- Get data from cache.
-- Returns all cached data for "space_name" or nil.
local function cache_get(space_name)
if space_name == nil then
return nil
end

-- using tarantool internal API.
-- this is not reliable, but it is the only way to track
-- schema_version changes. Fix it if a public method appears:
-- https://github.com/tarantool/tarantool/issues/6544
local schema_version = box.internal.schema_version()

if not cache then
cache = {
lock = fiber.channel(1)
}
cache_build()
cache_set_trigger()
cache.schema_version = schema_version
end

-- rebuild cache if database schema changed
if schema_version ~= cache.schema_version then
cache_build()
cache_set_trigger()
cache.schema_version = schema_version
end

return cache.spaces[space_name]
end

return {
internal = {
get = cache_get,
}
}
34 changes: 24 additions & 10 deletions ddl/get.lua
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
local utils = require('ddl.utils')
local cache = require('ddl.cache')
local ddl_check = require('ddl.check')

local function _get_index_field_path(space, index_part)
Expand Down Expand Up @@ -66,11 +67,24 @@ local function get_metadata(space_name, metadata_name)
end

local function get_sharding_func(space_name)
local record = get_metadata(space_name, "sharding_func")
local record = cache.internal.get(space_name)

if not record then
return nil
end

return record.processed
end

local function get_sharding_func_raw(space_name)
local record = cache.internal.get(space_name)

if not record or not record.raw then
return nil
end

record = record.raw

if record.sharding_func_body ~= nil then
return {body = record.sharding_func_body}
end
Expand All @@ -97,7 +111,7 @@ local function get_space_schema(space_name)
space_ddl.engine = box_space.engine
space_ddl.format = box_space:format()
space_ddl.sharding_key = get_sharding_key(space_name)
space_ddl.sharding_func = get_sharding_func(space_name)
space_ddl.sharding_func = get_sharding_func_raw(space_name)
for _, field in ipairs(space_ddl.format) do
if field.is_nullable == nil then
field.is_nullable = false
Expand All @@ -115,21 +129,21 @@ local function get_space_schema(space_name)
end

local function prepare_sharding_func_for_call(space_name, sharding_func_def)
if type(sharding_func_def) == 'string' then
if type(sharding_func_def) == 'table' then
local sharding_func = utils.get_G_function(sharding_func_def)
if sharding_func ~= nil and
ddl_check.internal.is_callable(sharding_func) == true then
return sharding_func
end
end

if type(sharding_func_def) == 'table' then
local sharding_func, err = loadstring('return ' .. sharding_func_def.body)
if sharding_func == nil then
return nil, string.format(
"Body is incorrect in sharding_func for space (%s): %s", space_name, err)
end
return sharding_func()
if type(sharding_func_def) == 'function' then
return sharding_func_def
end

-- error from cache
if type(sharding_func_def) == 'string' then
return nil, sharding_func_def
end

return nil, string.format(
Expand Down
12 changes: 11 additions & 1 deletion ddl/utils.lua
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,19 @@ end
-- split sharding func name in dot notation by dot
-- foo.bar.baz -> chunks: foo bar baz
-- foo -> chunks: foo
--
-- func_name parameter may be a string in dot notation or table
-- if func_name type is of type table it is assumed that it is already split
local function get_G_function(func_name)
local chunks = string.split(func_name, '.')
local sharding_func = _G
local chunks

if type(func_name) == 'string' then
chunks = string.split(func_name, '.')
else
chunks = func_name
end

-- check is the each chunk an identifier
for _, chunk in pairs(chunks) do
if not check_name_isident(chunk) or sharding_func == nil then
Expand Down
151 changes: 151 additions & 0 deletions test/cache_test.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
#!/usr/bin/env tarantool

local t = require('luatest')
local db = require('test.db')
local ddl = require('ddl')
local cache = require('ddl.cache')
local helper = require('test.helper')

local SPACE_NAME_IDX = 1
local SHARD_FUNC_NAME_IDX = 2
local SHARD_FUNC_BODY_IDX = 3

local primary_index = {
type = 'HASH',
unique = true,
parts = {
{path = 'string_nonnull', is_nullable = false, type = 'string'},
{path = 'unsigned_nonnull', is_nullable = false, type = 'unsigned'},
},
name = 'primary'
}

local bucket_id_idx = {
type = 'TREE',
unique = false,
parts = {{path = 'bucket_id', type = 'unsigned', is_nullable = false}},
name = 'bucket_id'
}

local func_body_first = 'function() return 42 end'
local func_body_second = 'function() return 24 end'

local function space_init(g)
db.drop_all()

g.space = {
engine = 'memtx',
is_local = true,
temporary = false,
format = table.deepcopy(helper.test_space_format())
}
table.insert(g.space.format, 1, {
name = 'bucket_id', type = 'unsigned', is_nullable = false
})

g.space.indexes = {
table.deepcopy(primary_index),
table.deepcopy(bucket_id_idx)
}
g.space.sharding_key = {'unsigned_nonnull', 'integer_nonnull'}
g.schema = {
spaces = {
space = g.space,
}
}
end

local g = t.group()
g.before_all(db.init)
g.before_each(space_init)

function g.test_cache_processed_func_body()
g.schema.spaces.space.sharding_func = {
body = func_body_first
}
local ok, err = ddl.set_schema(g.schema)
t.assert_equals(err, nil)
t.assert_equals(ok, true)

local res = cache.internal.get('space')
t.assert(res)
t.assert(res.processed)
t.assert(type(res.processed) == 'function')
t.assert_equals(res.processed(), 42)
end

function g.test_cache_processed_func_name()
local sharding_func_name = 'sharding_func'
rawset(_G, sharding_func_name, function(key) return key end)
g.schema.spaces.space.sharding_func = sharding_func_name

local ok, err = ddl.set_schema(g.schema)
t.assert_equals(err, nil)
t.assert_equals(ok, true)

local res = cache.internal.get('space')
t.assert(res)
t.assert(res.processed)
t.assert(type(res.processed) == 'table')
t.assert_equals(res.processed[1], 'sharding_func')

rawset(_G, sharding_func_name, nil)
end

function g.test_cache_schema_changed()
g.schema.spaces.space.sharding_func = {
body = func_body_first
}
local ok, err = ddl.set_schema(g.schema)
t.assert_equals(err, nil)
t.assert_equals(ok, true)

local res = cache.internal.get('space')
t.assert(res)
t.assert(res.raw)
t.assert_equals(res.raw[SPACE_NAME_IDX], 'space')
t.assert_equals(res.raw[SHARD_FUNC_NAME_IDX], nil)
t.assert_equals(res.raw[SHARD_FUNC_BODY_IDX], func_body_first)

space_init(g)

g.schema.spaces.space.sharding_func = {
body = func_body_second
}
local ok, err = ddl.set_schema(g.schema)
t.assert_equals(err, nil)
t.assert_equals(ok, true)

local res = cache.internal.get('space')
t.assert(res)
t.assert(res.raw)
t.assert_equals(res.raw[SPACE_NAME_IDX], 'space')
t.assert_equals(res.raw[SHARD_FUNC_NAME_IDX], nil)
t.assert_equals(res.raw[SHARD_FUNC_BODY_IDX], func_body_second)
end

function g.test_cache_space_updated()
g.schema.spaces.space.sharding_func = {
body = func_body_first
}
local ok, err = ddl.set_schema(g.schema)
t.assert_equals(err, nil)
t.assert_equals(ok, true)

local res = cache.internal.get('space')
t.assert(res)
t.assert(res.raw)
t.assert_equals(res.raw[SPACE_NAME_IDX], 'space')
t.assert_equals(res.raw[SHARD_FUNC_NAME_IDX], nil)
t.assert_equals(res.raw[SHARD_FUNC_BODY_IDX], func_body_first)

box.space._ddl_sharding_func
:update({'space'}, {{'=', SHARD_FUNC_BODY_IDX, func_body_second}})

local res = cache.internal.get('space')
t.assert(res)
t.assert(res.raw)
t.assert_equals(res.raw[SPACE_NAME_IDX], 'space')
t.assert_equals(res.raw[SHARD_FUNC_NAME_IDX], nil)
t.assert_equals(res.raw[SHARD_FUNC_BODY_IDX], func_body_second)
end

0 comments on commit 878ee46

Please sign in to comment.