-
Notifications
You must be signed in to change notification settings - Fork 5
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
The ddl.bucket_id() function needs to know a sharding function. It is costly to obtain the function declaration / definition stored in the _ddl_sharding_func space. This cache adds sharding function cache divided into two parts: raw and processed. Raw part is used for get_schema() method. Raw cache stored as is. Processed part is used for bucket_id(). Processed sharding_func cache entry may be: * table with parsed dot notation (like {'foo', 'bar'}) * function ready to call, this offloads using of loadstring() * string with an error Cache will be rebuilded if: * _ddl_sharding_func space changed: cache sets _ddl_sharding_func:on_replace trigger * schema changed: cache checks box.internal.schema_version changes This patch does not serve hot reload techniques. This entails an on_replace trigger duplication if hot reload occurs. Hot reload support will be done in separate task: #87 Closes #82
- Loading branch information
Showing
4 changed files
with
344 additions
and
12 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,128 @@ | ||
local fiber = require('fiber') | ||
|
||
local cache = nil | ||
|
||
local CACHE_LOCK_TIMEOUT = 3 | ||
local SPACE_NAME_IDX = 1 | ||
local SHARD_FUNC_NAME_IDX = 2 | ||
local SHARD_FUNC_BODY_IDX = 3 | ||
|
||
-- Function decorator that is used to prevent cache_build() from being | ||
-- called concurrently by different fibers. | ||
local function locked(f) | ||
return function(...) | ||
local ok = cache.lock:put(true, CACHE_LOCK_TIMEOUT) | ||
|
||
if not ok then | ||
error("cache lock timeout is exceeded") | ||
end | ||
|
||
local status, err = pcall(f, ...) | ||
cache.lock:get() | ||
|
||
if not status or err ~= nil then | ||
return err | ||
end | ||
end | ||
end | ||
|
||
-- Build cache and setup "on_replace" trigger. | ||
-- | ||
-- Cache structure format: | ||
-- cache = { | ||
-- spaces = { | ||
-- 'space_name' = { | ||
-- raw = {}, -- raw sharding metadata, used for ddl.get() | ||
-- processed = {} -- table with parsed dot notation (like {'foo', 'bar'}) | ||
-- -- or a function ready to call (or a string with an error) | ||
-- } | ||
-- }, | ||
-- lock -- locking based on fiber.channel() | ||
-- } | ||
local cache_build = locked(function() | ||
-- clear cache | ||
cache.spaces = {} | ||
|
||
if box.space._ddl_sharding_func == nil then | ||
return | ||
end | ||
|
||
for _, tuple in box.space._ddl_sharding_func:pairs() do | ||
local space_name = tuple[SPACE_NAME_IDX] | ||
local func_name = tuple[SHARD_FUNC_NAME_IDX] | ||
local func_body = tuple[SHARD_FUNC_BODY_IDX] | ||
|
||
cache.spaces[space_name] = {} | ||
cache.spaces[space_name].sharding_func = { raw = tuple } | ||
|
||
if func_body ~= nil then | ||
local sharding_func, err = loadstring('return ' .. func_body) | ||
if sharding_func == nil then | ||
cache.spaces[space_name].sharding_func.processed = | ||
string.format("Body is incorrect in sharding_func for space (%s): %s", | ||
space_name, err) | ||
else | ||
cache.spaces[space_name].sharding_func.processed = | ||
sharding_func() | ||
end | ||
elseif func_name ~= nil then | ||
local chunks = string.split(func_name, '.') | ||
cache.spaces[space_name].sharding_func.processed = chunks | ||
end | ||
end | ||
|
||
cache._schema_version = box.internal.schema_version() | ||
end) | ||
|
||
-- Rebuild cache if _ddl_sharding_func space changed. | ||
local function cache_set_trigger() | ||
if box.space._ddl_sharding_func == nil then | ||
return | ||
end | ||
|
||
local trigger_found = false | ||
|
||
for _, func in pairs(box.space._ddl_sharding_func:on_replace()) do | ||
if func == cache_build then | ||
trigger_found = true | ||
break | ||
end | ||
end | ||
|
||
if not trigger_found then | ||
box.space._ddl_sharding_func:on_replace(cache_build) | ||
end | ||
end | ||
|
||
-- Get data from cache. | ||
-- Returns all cached data for "space_name". | ||
local function cache_get(space_name) | ||
if space_name == nil then | ||
return nil | ||
end | ||
|
||
local schema_version = box.internal.schema_version() | ||
|
||
if not cache then | ||
cache = { | ||
spaces = {}, | ||
lock = fiber.channel(1) | ||
} | ||
cache_build() | ||
cache_set_trigger() | ||
end | ||
|
||
-- rebuild cache if database schema changed | ||
if schema_version ~= cache._schema_version then | ||
cache_build() | ||
cache_set_trigger() | ||
end | ||
|
||
return cache.spaces[space_name] | ||
end | ||
|
||
return { | ||
internal = { | ||
get = cache_get, | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.