Skip to content

Commit

Permalink
Copy the content of sharding_key.lua file to `sharding_metadata.lua…
Browse files Browse the repository at this point in the history
…` file

PR #181 introduced support of DDL sharding keys. Implementation of
sharding keys support contains methods that are common to support
sharding keys and sharding functions. That's why a separate file
`sharding_metadata.lua` was created to contain common methods.
This commit relocates functions for fetching sharding key from
`sharding_key.lua` file to `sharding_metadata.lua` file
to simplify a reviewer's life and display the history of
changes relative to PR #181 in the following commits.

Part of #237
  • Loading branch information
AnaNek committed Feb 1, 2022
1 parent 14d39f2 commit 9016ed6
Show file tree
Hide file tree
Showing 9 changed files with 158 additions and 148 deletions.
4 changes: 2 additions & 2 deletions crud.lua
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ local select = require('crud.select')
local truncate = require('crud.truncate')
local len = require('crud.len')
local borders = require('crud.borders')
local sharding_key = require('crud.common.sharding.sharding_key')
local sharding_metadata = require('crud.common.sharding.sharding_metadata')
local utils = require('crud.common.utils')

local crud = {}
Expand Down Expand Up @@ -114,7 +114,7 @@ function crud.init_storage()
truncate.init()
len.init()
borders.init()
sharding_key.init()
sharding_metadata.init()
end

function crud.init_router()
Expand Down
4 changes: 2 additions & 2 deletions crud/common/sharding/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ local errors = require('errors')
local BucketIDError = errors.new_class("BucketIDError", {capture_stack = false})

local utils = require('crud.common.utils')
local sharding_key_module = require('crud.common.sharding.sharding_key')
local sharding_metadata_module = require('crud.common.sharding.sharding_metadata')

local sharding = {}

Expand All @@ -22,7 +22,7 @@ function sharding.tuple_get_bucket_id(tuple, space, specified_bucket_id)
end

local sharding_index_parts = space.index[0].parts
local sharding_key_as_index_obj, err = sharding_key_module.fetch_on_router(space.name)
local sharding_key_as_index_obj, err = sharding_metadata_module.fetch_on_router(space.name)
if err ~= nil then
return nil, err
end
Expand Down
135 changes: 1 addition & 134 deletions crud/common/sharding/sharding_key.lua
Original file line number Diff line number Diff line change
@@ -1,45 +1,14 @@
local fiber = require('fiber')
local errors = require('errors')

local call = require('crud.common.call')
local const = require('crud.common.const')
local dev_checks = require('crud.common.dev_checks')
local cache = require('crud.common.sharding.sharding_key_cache')
local cache = require('crud.common.sharding.sharding_metadata_cache')
local utils = require('crud.common.utils')

local ShardingKeyError = errors.new_class("ShardingKeyError", {capture_stack = false})
local FetchShardingKeyError = errors.new_class('FetchShardingKeyError', {capture_stack = false})
local WrongShardingConfigurationError = errors.new_class('WrongShardingConfigurationError', {capture_stack = false})

local FETCH_FUNC_NAME = '_crud.fetch_on_storage'

local sharding_key_module = {}

-- Function decorator that is used to prevent _fetch_on_router() from being
-- called concurrently by different fibers.
local function locked(f)
dev_checks('function')

return function(timeout, ...)
local timeout_deadline = fiber.clock() + timeout
local ok = cache.fetch_lock:put(true, timeout)
-- channel:put() returns false in two cases: when timeout is exceeded
-- or channel has been closed. However error message describes only
-- first reason, I'm not sure we need to disclose to users such details
-- like problems with synchronization objects.
if not ok then
return FetchShardingKeyError:new(
"Timeout for fetching sharding key is exceeded")
end
local timeout = timeout_deadline - fiber.clock()
local status, err = pcall(f, timeout, ...)
cache.fetch_lock:get()
if not status or err ~= nil then
return err
end
end
end

-- Build a structure similar to index, but it is not a real index object,
-- it contains only parts key with fieldno's.
local function as_index_object(space_name, space_format, sharding_key_def)
Expand All @@ -59,104 +28,6 @@ local function as_index_object(space_name, space_format, sharding_key_def)
return {parts = fieldnos}
end

-- Return a map with metadata or nil when space box.space._ddl_sharding_key is
-- not available on storage.
function sharding_key_module.fetch_on_storage()
local sharding_key_space = box.space._ddl_sharding_key
if sharding_key_space == nil then
return nil
end

local SPACE_NAME_FIELDNO = 1
local SPACE_SHARDING_KEY_FIELDNO = 2
local metadata_map = {}
for _, tuple in sharding_key_space:pairs() do
local space_name = tuple[SPACE_NAME_FIELDNO]
local sharding_key_def = tuple[SPACE_SHARDING_KEY_FIELDNO]
local space_format = box.space[space_name]:format()
metadata_map[space_name] = {
sharding_key_def = sharding_key_def,
space_format = space_format,
}
end

return metadata_map
end

-- Under high load we may get a case when more than one fiber will fetch
-- metadata from storages. It is not good from performance point of view.
-- locked() wraps a _fetch_on_router() to limit a number of fibers that fetches
-- a sharding metadata by a single one, other fibers will wait while
-- cache.fetch_lock become unlocked during timeout passed to
-- _fetch_on_router().
local _fetch_on_router = locked(function(timeout)
dev_checks('number')

if cache.sharding_key_as_index_obj_map ~= nil then
return
end

local metadata_map, err = call.any(FETCH_FUNC_NAME, {}, {
timeout = timeout
})
if err ~= nil then
return err
end
if metadata_map == nil then
cache.sharding_key_as_index_obj_map = {}
return
end

cache.sharding_key_as_index_obj_map = {}
for space_name, metadata in pairs(metadata_map) do
local sharding_key_as_index_obj, err = as_index_object(space_name,
metadata.space_format,
metadata.sharding_key_def)
if err ~= nil then
return err
end
cache.sharding_key_as_index_obj_map[space_name] = sharding_key_as_index_obj
end
end)

-- Get sharding index for a certain space.
--
-- Return:
-- - sharding key as index object, when sharding key definition found on
-- storage.
-- - nil, when sharding key definition was not found on storage. Pay attention
-- that nil without error is a successfull return value.
-- - nil and error, when something goes wrong on fetching attempt.
--
function sharding_key_module.fetch_on_router(space_name, timeout)
dev_checks('string', '?number')

if cache.sharding_key_as_index_obj_map ~= nil then
return cache.sharding_key_as_index_obj_map[space_name]
end

local timeout = timeout or const.FETCH_SHARDING_KEY_TIMEOUT
local err = _fetch_on_router(timeout)
if err ~= nil then
if cache.sharding_key_as_index_obj_map ~= nil then
return cache.sharding_key_as_index_obj_map[space_name]
end
return nil, err
end

if cache.sharding_key_as_index_obj_map ~= nil then
return cache.sharding_key_as_index_obj_map[space_name]
end

return nil, FetchShardingKeyError:new(
"Fetching sharding key for space '%s' is failed", space_name)
end

function sharding_key_module.update_cache(space_name)
cache.drop_caches()
return sharding_key_module.fetch_on_router(space_name)
end

-- Make sure sharding key definition is a part of primary key.
local function is_part_of_pk(space_name, primary_index_parts, sharding_key_as_index_obj)
dev_checks('string', 'table', 'table')
Expand Down Expand Up @@ -229,10 +100,6 @@ function sharding_key_module.extract_from_pk(space_name, primary_index_parts, pr
return extract_from_index(primary_key, primary_index_parts, sharding_key_as_index_obj)
end

function sharding_key_module.init()
_G._crud.fetch_on_storage = sharding_key_module.fetch_on_storage
end

sharding_key_module.internal = {
as_index_object = as_index_object,
extract_from_index = extract_from_index,
Expand Down
142 changes: 142 additions & 0 deletions crud/common/sharding/sharding_metadata.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
local fiber = require('fiber')
local errors = require('errors')

local call = require('crud.common.call')
local const = require('crud.common.const')
local dev_checks = require('crud.common.dev_checks')
local cache = require('crud.common.sharding.sharding_metadata_cache')

local FetchShardingKeyError = errors.new_class('FetchShardingKeyError', {capture_stack = false})

local FETCH_FUNC_NAME = '_crud.fetch_on_storage'

local sharding_key_module = {}

-- Function decorator that is used to prevent _fetch_on_router() from being
-- called concurrently by different fibers.
local function locked(f)
dev_checks('function')

return function(timeout, ...)
local timeout_deadline = fiber.clock() + timeout
local ok = cache.fetch_lock:put(true, timeout)
-- channel:put() returns false in two cases: when timeout is exceeded
-- or channel has been closed. However error message describes only
-- first reason, I'm not sure we need to disclose to users such details
-- like problems with synchronization objects.
if not ok then
return FetchShardingKeyError:new(
"Timeout for fetching sharding key is exceeded")
end
local timeout = timeout_deadline - fiber.clock()
local status, err = pcall(f, timeout, ...)
cache.fetch_lock:get()
if not status or err ~= nil then
return err
end
end
end

-- Return a map with metadata or nil when space box.space._ddl_sharding_key is
-- not available on storage.
function sharding_key_module.fetch_on_storage()
local sharding_key_space = box.space._ddl_sharding_key
if sharding_key_space == nil then
return nil
end

local SPACE_NAME_FIELDNO = 1
local SPACE_SHARDING_KEY_FIELDNO = 2
local metadata_map = {}
for _, tuple in sharding_key_space:pairs() do
local space_name = tuple[SPACE_NAME_FIELDNO]
local sharding_key_def = tuple[SPACE_SHARDING_KEY_FIELDNO]
local space_format = box.space[space_name]:format()
metadata_map[space_name] = {
sharding_key_def = sharding_key_def,
space_format = space_format,
}
end

return metadata_map
end

-- Under high load we may get a case when more than one fiber will fetch
-- metadata from storages. It is not good from performance point of view.
-- locked() wraps a _fetch_on_router() to limit a number of fibers that fetches
-- a sharding metadata by a single one, other fibers will wait while
-- cache.fetch_lock become unlocked during timeout passed to
-- _fetch_on_router().
local _fetch_on_router = locked(function(timeout)
dev_checks('number')

if cache.sharding_key_as_index_obj_map ~= nil then
return
end

local metadata_map, err = call.any(FETCH_FUNC_NAME, {}, {
timeout = timeout
})
if err ~= nil then
return err
end
if metadata_map == nil then
cache.sharding_key_as_index_obj_map = {}
return
end

cache.sharding_key_as_index_obj_map = {}
for space_name, metadata in pairs(metadata_map) do
local sharding_key_as_index_obj, err = as_index_object(space_name,
metadata.space_format,
metadata.sharding_key_def)
if err ~= nil then
return err
end
cache.sharding_key_as_index_obj_map[space_name] = sharding_key_as_index_obj
end
end)

-- Get sharding index for a certain space.
--
-- Return:
-- - sharding key as index object, when sharding key definition found on
-- storage.
-- - nil, when sharding key definition was not found on storage. Pay attention
-- that nil without error is a successfull return value.
-- - nil and error, when something goes wrong on fetching attempt.
--
function sharding_key_module.fetch_on_router(space_name, timeout)
dev_checks('string', '?number')

if cache.sharding_key_as_index_obj_map ~= nil then
return cache.sharding_key_as_index_obj_map[space_name]
end

local timeout = timeout or const.FETCH_SHARDING_KEY_TIMEOUT
local err = _fetch_on_router(timeout)
if err ~= nil then
if cache.sharding_key_as_index_obj_map ~= nil then
return cache.sharding_key_as_index_obj_map[space_name]
end
return nil, err
end

if cache.sharding_key_as_index_obj_map ~= nil then
return cache.sharding_key_as_index_obj_map[space_name]
end

return nil, FetchShardingKeyError:new(
"Fetching sharding key for space '%s' is failed", space_name)
end

function sharding_key_module.update_cache(space_name)
cache.drop_caches()
return sharding_key_module.fetch_on_router(space_name)
end

function sharding_key_module.init()
_G._crud.fetch_on_storage = sharding_key_module.fetch_on_storage
end

return sharding_key_module
File renamed without changes.
4 changes: 2 additions & 2 deletions crud/select/compat/select.lua
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ local sharding = require('crud.common.sharding')
local dev_checks = require('crud.common.dev_checks')
local common = require('crud.select.compat.common')
local schema = require('crud.common.schema')
local sharding_key_module = require('crud.common.sharding.sharding_key')
local sharding_metadata_module = require('crud.common.sharding.sharding_metadata')

local compare_conditions = require('crud.compare.conditions')
local select_plan = require('crud.select.plan')
Expand Down Expand Up @@ -51,7 +51,7 @@ local function build_select_iterator(space_name, user_conditions, opts)
return nil, SelectError:new("Space %q doesn't exist", space_name), true
end
local space_format = space:format()
local sharding_key_as_index_obj, err = sharding_key_module.fetch_on_router(space_name)
local sharding_key_as_index_obj, err = sharding_metadata_module.fetch_on_router(space_name)
if err ~= nil then
return nil, err
end
Expand Down
4 changes: 2 additions & 2 deletions crud/select/compat/select_old.lua
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ local utils = require('crud.common.utils')
local sharding = require('crud.common.sharding')
local dev_checks = require('crud.common.dev_checks')
local schema = require('crud.common.schema')
local sharding_key_module = require('crud.common.sharding.sharding_key')
local sharding_metadata_module = require('crud.common.sharding.sharding_metadata')

local compare_conditions = require('crud.compare.conditions')
local select_plan = require('crud.select.plan')
Expand Down Expand Up @@ -103,7 +103,7 @@ local function build_select_iterator(space_name, user_conditions, opts)
return nil, SelectError:new("Space %q doesn't exist", space_name), true
end
local space_format = space:format()
local sharding_key_as_index_obj, err = sharding_key_module.fetch_on_router(space_name)
local sharding_key_as_index_obj, err = sharding_metadata_module.fetch_on_router(space_name)
if err ~= nil then
return nil, err
end
Expand Down
4 changes: 2 additions & 2 deletions test/helper.lua
Original file line number Diff line number Diff line change
Expand Up @@ -325,10 +325,10 @@ end

function helpers.update_cache(cluster, space_name)
return cluster.main_server.net_box:eval([[
local sharding_key = require('crud.common.sharding.sharding_key')
local sharding_metadata = require('crud.common.sharding.sharding_metadata')
local space_name = ...
return sharding_key.update_cache(space_name)
return sharding_metadata.update_cache(space_name)
]], {space_name})
end

Expand Down
Loading

0 comments on commit 9016ed6

Please sign in to comment.