diff --git a/ddl/cache.lua b/ddl/cache.lua new file mode 100644 index 0000000..01d419a --- /dev/null +++ b/ddl/cache.lua @@ -0,0 +1,108 @@ +local fiber = require('fiber') + +local cache = nil + +local CACHE_LOCK_TIMEOUT = 3 +local SPACE_NAME_IDX = 1 +local SHARD_FUNC_NAME_IDX = 2 +local SHARD_FUNC_BODY_IDX = 3 + +-- Build cache and setup "on_replace" trigger. +-- +-- Cache structure format: +-- cache = { +-- spaces = { +-- 'space_name' = { +-- raw = {}, -- raw sharding metadata, used for ddl.get() +-- processed = {} -- sharding function that is ready to call or a string with error +-- } +-- }, +-- fetch_lock -- locking based on fiber.channel() +-- } +local function cache_build() + -- prevent cache_build() from being called concurrently by different fibers + local unlocked = cache.fetch_lock:put(true, CACHE_LOCK_TIMEOUT) + local trigger_found = false + + if not unlocked then + return + end + + -- clear cache + cache.spaces = {} + + if box.space._ddl_sharding_func == nil then + cache.fetch_lock:get() + return nil + end + + for _, tuple in box.space._ddl_sharding_func:pairs() do + local space_name = tuple[SPACE_NAME_IDX] + local func_name = tuple[SHARD_FUNC_NAME_IDX] + local func_body = tuple[SHARD_FUNC_BODY_IDX] + + cache.spaces[space_name] = {} + cache.spaces[space_name].sharding_func = { raw = tuple } + + if func_body ~= nil then + local sharding_func, err = loadstring('return ' .. func_body) + if sharding_func == nil then + cache.spaces[space_name].sharding_func.processed = + string.format("Body is incorrect in sharding_func for space (%s): %s", + space_name, err) + else + cache.spaces[space_name].sharding_func.processed = + sharding_func() + end + elseif func_name ~= nil then + local chunks = string.split(func_name, '.') + cache.spaces[space_name].sharding_func.processed = chunks + end + end + + cache._schema_version = box.internal.schema_version() + + for _, func in pairs(box.space._ddl_sharding_func:on_replace()) do + if func == cache_build then + trigger_found = true + break + end + end + + if not trigger_found then + box.space._ddl_sharding_func:on_replace(cache_build) + end + + cache.fetch_lock:get() +end + +-- Get data from cache. +-- Returns all cached data for "space_name". +local function cache_get(space_name) + if space_name == nil then + return nil + end + + local schema_version = box.internal.schema_version() + + if not cache then + cache = { + spaces = {}, + fetch_lock = fiber.channel(1) + } + cache_build() + end + + -- rebuild cache if database schema changed + if schema_version ~= cache._schema_version then + cache_build() + end + + return cache.spaces[space_name] +end + +return { + internal = { + get = cache_get, + } +} diff --git a/ddl/get.lua b/ddl/get.lua index f151ef4..a72c65d 100644 --- a/ddl/get.lua +++ b/ddl/get.lua @@ -1,4 +1,5 @@ local utils = require('ddl.utils') +local cache = require('ddl.cache') local ddl_check = require('ddl.check') local function _get_index_field_path(space, index_part) @@ -66,11 +67,25 @@ local function get_metadata(space_name, metadata_name) end local function get_sharding_func(space_name) - local record = get_metadata(space_name, "sharding_func") - if not record then + local record = cache.internal.get(space_name) + + if not record or not record.sharding_func then + return nil + end + + return record.sharding_func.processed +end + +local function get_sharding_func_raw(space_name) + local record = cache.internal.get(space_name) + + if not record or not record.sharding_func or not + record.sharding_func.raw then return nil end + record = record.sharding_func.raw + if record.sharding_func_body ~= nil then return {body = record.sharding_func_body} end @@ -97,7 +112,7 @@ local function get_space_schema(space_name) space_ddl.engine = box_space.engine space_ddl.format = box_space:format() space_ddl.sharding_key = get_sharding_key(space_name) - space_ddl.sharding_func = get_sharding_func(space_name) + space_ddl.sharding_func = get_sharding_func_raw(space_name) for _, field in ipairs(space_ddl.format) do if field.is_nullable == nil then field.is_nullable = false @@ -115,7 +130,7 @@ local function get_space_schema(space_name) end local function prepare_sharding_func_for_call(space_name, sharding_func_def) - if type(sharding_func_def) == 'string' then + if type(sharding_func_def) == 'table' then local sharding_func = utils.get_G_function(sharding_func_def) if sharding_func ~= nil and ddl_check.internal.is_callable(sharding_func) == true then @@ -123,13 +138,13 @@ local function prepare_sharding_func_for_call(space_name, sharding_func_def) end end - if type(sharding_func_def) == 'table' then - local sharding_func, err = loadstring('return ' .. sharding_func_def.body) - if sharding_func == nil then - return nil, string.format( - "Body is incorrect in sharding_func for space (%s): %s", space_name, err) - end - return sharding_func() + if type(sharding_func_def) == 'function' then + return sharding_func_def + end + + -- error from cache + if type(sharding_func_def) == 'string' then + return nil, sharding_func_def end return nil, string.format( diff --git a/ddl/utils.lua b/ddl/utils.lua index bb5a433..fd8d440 100644 --- a/ddl/utils.lua +++ b/ddl/utils.lua @@ -190,8 +190,16 @@ end -- foo.bar.baz -> chunks: foo bar baz -- foo -> chunks: foo local function get_G_function(func_name) - local chunks = string.split(func_name, '.') local sharding_func = _G + local chunks + + -- function name received from cache is already splitted + if type(func_name) == 'string' then + chunks = string.split(func_name, '.') + else + chunks = func_name + end + -- check is the each chunk an identifier for _, chunk in pairs(chunks) do if not check_name_isident(chunk) or sharding_func == nil then diff --git a/test/cache_test.lua b/test/cache_test.lua new file mode 100644 index 0000000..5e93a06 --- /dev/null +++ b/test/cache_test.lua @@ -0,0 +1,179 @@ +#!/usr/bin/env tarantool + +local t = require('luatest') +local db = require('test.db') +local ddl = require('ddl') +local cache = require('ddl.cache') + +local SPACE_NAME_IDX = 1 +local SHARD_FUNC_NAME_IDX = 2 +local SHARD_FUNC_BODY_IDX = 3 + +local test_space = { + engine = 'memtx', + is_local = true, + temporary = false, + format = { + {name = 'unsigned_nonnull', type = 'unsigned', is_nullable = false}, + {name = 'unsigned_nullable', type = 'unsigned', is_nullable = true}, + {name = 'integer_nonnull', type = 'integer', is_nullable = false}, + {name = 'integer_nullable', type = 'integer', is_nullable = true}, + {name = 'number_nonnull', type = 'number', is_nullable = false}, + {name = 'number_nullable', type = 'number', is_nullable = true}, + {name = 'boolean_nonnull', type = 'boolean', is_nullable = false}, + {name = 'boolean_nullable', type = 'boolean', is_nullable = true}, + {name = 'string_nonnull', type = 'string', is_nullable = false}, + {name = 'string_nullable', type = 'string', is_nullable = true}, + {name = 'scalar_nonnull', type = 'scalar', is_nullable = false}, + {name = 'scalar_nullable', type = 'scalar', is_nullable = true}, + {name = 'array_nonnull', type = 'array', is_nullable = false}, + {name = 'array_nullable', type = 'array', is_nullable = true}, + {name = 'map_nonnull', type = 'map', is_nullable = false}, + {name = 'map_nullable', type = 'map', is_nullable = true}, + {name = 'any_nonnull', type = 'any', is_nullable = false}, + {name = 'any_nullable', type = 'any', is_nullable = true}, + }, +} + +local primary_index = { + type = 'HASH', + unique = true, + parts = { + {path = 'string_nonnull', is_nullable = false, type = 'string'}, + {path = 'unsigned_nonnull', is_nullable = false, type = 'unsigned'}, + }, + name = 'primary' +} + +local bucket_id_idx = { + type = 'TREE', + unique = false, + parts = {{path = 'bucket_id', type = 'unsigned', is_nullable = false}}, + name = 'bucket_id' +} + +local func_body_first = 'function() return 42 end' +local func_body_second = 'function() return 24 end' + +local function rebuild_db(g) + db.drop_all() + + g.space = table.deepcopy(test_space) + table.insert(g.space.format, 1, { + name = 'bucket_id', type = 'unsigned', is_nullable = false + }) + + g.space.indexes = { + table.deepcopy(primary_index), + table.deepcopy(bucket_id_idx) + } + g.space.sharding_key = {'unsigned_nonnull', 'integer_nonnull'} + g.schema = { + spaces = { + space = g.space, + } + } +end + +local g = t.group() +g.before_all(db.init) +g.before_each(function() + rebuild_db(g) +end) + +function g.test_cache_processed_func_body() + g.schema.spaces.space.sharding_func = { + body = func_body_first + } + local ok, err = ddl.set_schema(g.schema) + t.assert_equals(err, nil) + t.assert_equals(ok, true) + + local res = cache.internal.get('space') + t.assert(res) + t.assert(res.sharding_func.processed) + res = res.sharding_func.processed + t.assert(type(res) == 'function') + t.assert_equals(res(), 42) +end + +function g.test_cache_processed_func_name() + local sharding_func_name = 'sharding_func' + rawset(_G, sharding_func_name, function(key) return key end) + g.schema.spaces.space.sharding_func = sharding_func_name + + local ok, err = ddl.set_schema(g.schema) + t.assert_equals(err, nil) + t.assert_equals(ok, true) + + local res = cache.internal.get('space') + t.assert(res) + t.assert(res.sharding_func.processed) + res = res.sharding_func.processed + t.assert(type(res) == 'table') + t.assert_equals(res[1], 'sharding_func') + + rawset(_G, sharding_func_name, nil) +end + +function g.test_cache_schema_changed() + g.schema.spaces.space.sharding_func = { + body = func_body_first + } + local ok, err = ddl.set_schema(g.schema) + t.assert_equals(err, nil) + t.assert_equals(ok, true) + + local res = cache.internal.get('space') + t.assert(res) + t.assert(res.sharding_func.raw) + res = res.sharding_func.raw + t.assert_equals(res[SPACE_NAME_IDX], 'space') + t.assert_equals(res[SHARD_FUNC_NAME_IDX], nil) + t.assert_equals(res[SHARD_FUNC_BODY_IDX], func_body_first) + + rebuild_db(g) + + g.schema.spaces.space.sharding_func = { + body = func_body_second + } + local ok, err = ddl.set_schema(g.schema) + t.assert_equals(err, nil) + t.assert_equals(ok, true) + + local res = cache.internal.get('space') + t.assert(res) + t.assert(res.sharding_func.raw) + res = res.sharding_func.raw + t.assert_equals(res[SPACE_NAME_IDX], 'space') + t.assert_equals(res[SHARD_FUNC_NAME_IDX], nil) + t.assert_equals(res[SHARD_FUNC_BODY_IDX], func_body_second) +end + +function g.test_cache_space_updated() + g.schema.spaces.space.sharding_func = { + body = func_body_first + } + local ok, err = ddl.set_schema(g.schema) + t.assert_equals(err, nil) + t.assert_equals(ok, true) + + local res = cache.internal.get('space') + t.assert(res) + t.assert(res.sharding_func.raw) + res = res.sharding_func.raw + t.assert_equals(res[SPACE_NAME_IDX], 'space') + t.assert_equals(res[SHARD_FUNC_NAME_IDX], nil) + t.assert_equals(res[SHARD_FUNC_BODY_IDX], func_body_first) + + box.space._ddl_sharding_func + :update({'space'}, {{'=', SHARD_FUNC_BODY_IDX, func_body_second}}) + + local res = cache.internal.get('space') + t.assert(res) + t.assert(res.sharding_func.raw) + res = res.sharding_func.raw + t.assert_equals(res[SPACE_NAME_IDX], 'space') + t.assert_equals(res[SHARD_FUNC_NAME_IDX], nil) + t.assert_equals(res[SHARD_FUNC_BODY_IDX], func_body_second) +end