Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

api: support vshard groups #319

Merged
merged 9 commits into from
Aug 31, 2022
19 changes: 14 additions & 5 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,19 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).

## [Unreleased]

### Added
* Support `vshard_router` option in operations for Cartridge vshard groups
or non-default vshard routers (#44).

### Changed
* Deprecate using space id in `crud.len` (#255).

## [0.13.0] - 29-08-22

### Added
* `crud.storage_info` function to get storages status (#229, PR #299).
* `crud.storage_info` function to get storages status (#229).

### Fixed
* Fix specifying `vshard` sharding funcs (#314).
Expand All @@ -17,9 +26,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

### Fixed
* Fetching invalid ddl configuration (sharding key for non-existing space)
is no longer breaks CRUD requests (#308, PR #309).
* ddl space record delete no more throws error if crud is used (#310, PR #311).
* crud sharding metainfo is now updated on ddl record delete (#310, PR #311).
is no longer breaks CRUD requests (#308).
* ddl space record delete no more throws error if crud is used (#310).
* crud sharding metainfo is now updated on ddl record delete (#310).

## [0.12.0] - 28-06-22

Expand All @@ -28,7 +37,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
`crud.insert_many()`/`crud.insert_object_many()`/
`crud.upsert_many()`/`crud.upsert_object_many()`
`crud.replace_many()`/`crud.replace_object_many()`
with partial consistency (#193, PR #232).
with partial consistency (#193).

## [0.11.3] - 15-06-22

Expand Down
51 changes: 48 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,9 @@ where:
* `timeout` (`?number`) - `vshard.call` timeout (in seconds)
* `bucket_id` (`?number|cdata`) - bucket ID
* `fields` (`?table`) - field names for getting only a subset of fields
* `vshard_router` (`?string|table`) - Cartridge vshard group name or
vshard router instance. Set this parameter if your space is not
a part of the default vshard cluster

Returns metadata and array contains one inserted row, error.

Expand Down Expand Up @@ -259,6 +262,9 @@ where:
* `rollback_on_error` (`?boolean`) - any failed operation will lead to
rollback on a storage, where the operation is failed, report error
about what tuples were rollback, default is `false`
* `vshard_router` (`?string|table`) - Cartridge vshard group name or
vshard router instance. Set this parameter if your space is not
a part of the default vshard cluster

Returns metadata and array with inserted rows, array of errors.
Each error object can contain field `operation_data`.
Expand Down Expand Up @@ -393,6 +399,9 @@ where:
* `prefer_replica` (`?boolean`) - if `true` then the preferred target is one of
the replicas
* `balance` (`?boolean`) - use replica according to vshard load balancing policy
* `vshard_router` (`?string|table`) - Cartridge vshard group name or
vshard router instance. Set this parameter if your space is not
a part of the default vshard cluster

Returns metadata and array contains one row, error.

Expand Down Expand Up @@ -426,6 +435,9 @@ where:
* `timeout` (`?number`) - `vshard.call` timeout (in seconds)
* `bucket_id` (`?number|cdata`) - bucket ID
* `fields` (`?table`) - field names for getting only a subset of fields
* `vshard_router` (`?string|table`) - Cartridge vshard group name or
vshard router instance. Set this parameter if your space is not
a part of the default vshard cluster

Returns metadata and array contains one updated row, error.

Expand Down Expand Up @@ -458,6 +470,9 @@ where:
* `timeout` (`?number`) - `vshard.call` timeout (in seconds)
* `bucket_id` (`?number|cdata`) - bucket ID
* `fields` (`?table`) - field names for getting only a subset of fields
* `vshard_router` (`?string|table`) - Cartridge vshard group name or
vshard router instance. Set this parameter if your space is not
a part of the default vshard cluster

Returns metadata and array contains one deleted row (empty for vinyl), error.

Expand Down Expand Up @@ -492,6 +507,9 @@ where:
* `timeout` (`?number`) - `vshard.call` timeout (in seconds)
* `bucket_id` (`?number|cdata`) - bucket ID
* `fields` (`?table`) - field names for getting only a subset of fields
* `vshard_router` (`?string|table`) - Cartridge vshard group name or
vshard router instance. Set this parameter if your space is not
a part of the default vshard cluster

Returns inserted or replaced rows and metadata or nil with error.

Expand Down Expand Up @@ -544,6 +562,9 @@ where:
* `rollback_on_error` (`?boolean`) - any failed operation will lead to
rollback on a storage, where the operation is failed, report error
about what tuples were rollback, default is `false`
* `vshard_router` (`?string|table`) - Cartridge vshard group name or
vshard router instance. Set this parameter if your space is not
a part of the default vshard cluster

Returns metadata and array with inserted/replaced rows, array of errors.
Each error object can contain field `operation_data`.
Expand Down Expand Up @@ -676,6 +697,9 @@ where:
* `timeout` (`?number`) - `vshard.call` timeout (in seconds)
* `bucket_id` (`?number|cdata`) - bucket ID
* `fields` (`?table`) - field names for getting only a subset of fields
* `vshard_router` (`?string|table`) - Cartridge vshard group name or
vshard router instance. Set this parameter if your space is not
a part of the default vshard cluster

Returns metadata and empty array of rows or nil, error.

Expand Down Expand Up @@ -733,6 +757,9 @@ where:
* `rollback_on_error` (`?boolean`) - any failed operation will lead to
rollback on a storage, where the operation is failed, report error
about what tuples were rollback, default is `false`
* `vshard_router` (`?string|table`) - Cartridge vshard group name or
vshard router instance. Set this parameter if your space is not
a part of the default vshard cluster

Returns metadata and array of errors.
Each error object can contain field `operation_data`.
Expand Down Expand Up @@ -869,6 +896,9 @@ where:
* `prefer_replica` (`?boolean`) - if `true` then the preferred target is one of
the replicas
* `balance` (`?boolean`) - use replica according to vshard load balancing policy
* `vshard_router` (`?string|table`) - Cartridge vshard group name or
vshard router instance. Set this parameter if your space is not
a part of the default vshard cluster


Returns metadata and array of rows, error.
Expand Down Expand Up @@ -971,7 +1001,7 @@ local res, err = crud.cut_rows(rows, metadata, fields)
where:

* `rows` (`table`) - array of tuples for cutting
* `matadata` (`?metadata`) - metadata about `rows` fields
* `metadata` (`?table`) - metadata about `rows` fields
* `fields` (`table`) - field names of fields that should be contained in the result

Returns metadata and array of rows, error.
Expand Down Expand Up @@ -1007,6 +1037,9 @@ where:
* `space_name` (`string`) - name of the space
* `opts`:
* `timeout` (`?number`) - `vshard.call` timeout (in seconds)
* `vshard_router` (`?string|table`) - Cartridge vshard group name or
vshard router instance. Set this parameter if your space is not
a part of the default vshard cluster

Returns true or nil with error.

Expand Down Expand Up @@ -1037,13 +1070,21 @@ local result, err = crud.len(space_name, opts)

where:

* `space_name` (`string|number`) - name of the space as well
as numerical id of the space
* `space_name` (`string`) - name of the space
* `opts`:
* `timeout` (`?number`) - `vshard.call` timeout (in seconds)
* `vshard_router` (`?string|table`) - Cartridge vshard group name or
vshard router instance. Set this parameter if your space is not
a part of the default vshard cluster

Returns number or nil with error.

Using space id instead of space name is also possible, but
deprecated and will be removed in future releases.
DifferentialOrange marked this conversation as resolved.
Show resolved Hide resolved

Using space id in crud.len and custom vshard_router is not
supported by statistics: space labels may be inconsistent.

**Example:**

Using `memtx`:
Expand Down Expand Up @@ -1087,6 +1128,7 @@ where:
* `opts`:
* `timeout` (`?number`) - maximum time (in seconds, default: 2) to wait for response from
cluster instances.
* `vshard_router` (`?string|table`) - Cartridge vshard group name or vshard router instance.

Returns storages status table by instance UUID or nil with error. Status table fields:
* `status` contains a string representing the status:
Expand Down Expand Up @@ -1156,6 +1198,9 @@ where:
* `balance` (`?boolean`) - use replica according to
[vshard load balancing policy](https://www.tarantool.io/en/doc/latest/reference/reference_rock/vshard/vshard_api/#router-api-call),
default value is `false`
* `vshard_router` (`?string|table`) - Cartridge vshard group name or
vshard router instance. Set this parameter if your space is not
a part of the default vshard cluster

```lua
crud.count('customers', {{'==', 'age', 35}})
Expand Down
34 changes: 24 additions & 10 deletions crud/borders.lua
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
local checks = require('checks')
local errors = require('errors')
local vshard = require('vshard')

local const = require('crud.common.const')
local dev_checks = require('crud.common.dev_checks')
Expand Down Expand Up @@ -67,15 +66,15 @@ else
end
end

local function call_get_border_on_router(border_name, space_name, index_name, opts)
checks('string', 'string', '?string|number', {
local function call_get_border_on_router(vshard_router, border_name, space_name, index_name, opts)
oleg-jukovec marked this conversation as resolved.
Show resolved Hide resolved
checks('table', 'string', 'string', '?string|number', {
timeout = '?number',
fields = '?table',
vshard_router = '?string|table',
})

opts = opts or {}

local space = utils.get_space(space_name, vshard.router.routeall())
local replicasets = vshard_router:routeall()
local space = utils.get_space(space_name, replicasets)
if space == nil then
return nil, BorderError:new("Space %q doesn't exist", space_name), const.NEED_SCHEMA_RELOAD
end
Expand All @@ -98,13 +97,12 @@ local function call_get_border_on_router(border_name, space_name, index_name, op
local cmp_key_parts = utils.merge_primary_key_parts(index.parts, primary_index.parts)
local field_names = utils.enrich_field_names_with_cmp_key(opts.fields, cmp_key_parts, space:format())

local replicasets = vshard.router.routeall()
local call_opts = {
mode = 'read',
replicasets = replicasets,
timeout = opts.timeout,
}
local results, err = call.map(
local results, err = call.map(vshard_router,
STAT_FUNC_NAME,
{border_name, space_name, index.id, field_names},
call_opts
Expand Down Expand Up @@ -160,8 +158,14 @@ local function call_get_border_on_router(border_name, space_name, index_name, op
end

local function get_border(border_name, space_name, index_name, opts)
return schema.wrap_func_reload(
call_get_border_on_router, border_name, space_name, index_name, opts
opts = opts or {}
local vshard_router, err = utils.get_vshard_router_instance(opts.vshard_router)
if err ~= nil then
return nil, BorderError:new(err)
end

return schema.wrap_func_reload(vshard_router, call_get_border_on_router,
border_name, space_name, index_name, opts
)
oleg-jukovec marked this conversation as resolved.
Show resolved Hide resolved
end

Expand All @@ -181,6 +185,11 @@ end
-- @tparam ?table opts.fields
-- Field names for getting only a subset of fields
--
-- @tparam ?string|table opts.vshard_router
-- Cartridge vshard group name or vshard router instance.
-- Set this parameter if your space is not a part of the
-- default vshard cluster.
--
-- @return[1] result
-- @treturn[2] nil
-- @treturn[2] table Error description
Expand All @@ -204,6 +213,11 @@ end
-- @tparam ?table opts.fields
-- Field names for getting only a subset of fields
--
-- @tparam ?string|table opts.vshard_router
-- Cartridge vshard group name or vshard router instance.
-- Set this parameter if your space is not a part of the
-- default vshard cluster.
--
-- @return[1] result
-- @treturn[2] nil
-- @treturn[2] table Error description
Expand Down
33 changes: 18 additions & 15 deletions crud/common/call.lua
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
local vshard = require('vshard')
local errors = require('errors')

local dev_checks = require('crud.common.dev_checks')
Expand Down Expand Up @@ -41,14 +40,14 @@ function call.get_vshard_call_name(mode, prefer_replica, balance)
return 'callbre'
end

local function wrap_vshard_err(err, func_name, replicaset_uuid, bucket_id)
local function wrap_vshard_err(vshard_router, err, func_name, replicaset_uuid, bucket_id)
-- Do not rewrite ShardingHashMismatchError class.
if err.class_name == sharding_utils.ShardingHashMismatchError.name then
return errors.wrap(err)
end

if replicaset_uuid == nil then
local replicaset, _ = vshard.router.route(bucket_id)
local replicaset, _ = vshard_router:route(bucket_id)
if replicaset == nil then
return CallError:new(
"Function returned an error, but we couldn't figure out the replicaset: %s", err
Expand All @@ -66,8 +65,8 @@ local function wrap_vshard_err(err, func_name, replicaset_uuid, bucket_id)
))
end

function call.map(func_name, func_args, opts)
dev_checks('string', '?table', {
function call.map(vshard_router, func_name, func_args, opts)
dev_checks('table', 'string', '?table', {
mode = 'string',
prefer_replica = '?boolean',
balance = '?boolean',
Expand All @@ -87,7 +86,11 @@ function call.map(func_name, func_args, opts)

local iter = opts.iter
if iter == nil then
iter, err = BaseIterator:new({func_args = func_args, replicasets = opts.replicasets})
iter, err = BaseIterator:new({
func_args = func_args,
replicasets = opts.replicasets,
vshard_router = vshard_router,
})
if err ~= nil then
return nil, err
end
Expand Down Expand Up @@ -135,8 +138,8 @@ function call.map(func_name, func_args, opts)
return postprocessor:get()
end

function call.single(bucket_id, func_name, func_args, opts)
dev_checks('number', 'string', '?table', {
function call.single(vshard_router, bucket_id, func_name, func_args, opts)
dev_checks('table', 'number', 'string', '?table', {
mode = 'string',
prefer_replica = '?boolean',
balance = '?boolean',
Expand All @@ -150,12 +153,12 @@ function call.single(bucket_id, func_name, func_args, opts)

local timeout = opts.timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT

local res, err = vshard.router[vshard_call_name](bucket_id, func_name, func_args, {
local res, err = vshard_router[vshard_call_name](vshard_router, bucket_id, func_name, func_args, {
timeout = timeout,
})

if err ~= nil then
return nil, wrap_vshard_err(err, func_name, nil, bucket_id)
return nil, wrap_vshard_err(vshard_router, err, func_name, nil, bucket_id)
end

if res == box.NULL then
Expand All @@ -165,24 +168,24 @@ function call.single(bucket_id, func_name, func_args, opts)
return res
end

function call.any(func_name, func_args, opts)
dev_checks('string', '?table', {
function call.any(vshard_router, func_name, func_args, opts)
dev_checks('table', 'string', '?table', {
timeout = '?number',
})

local timeout = opts.timeout or const.DEFAULT_VSHARD_CALL_TIMEOUT

local replicasets, err = vshard.router.routeall()
local replicasets, err = vshard_router:routeall()
if replicasets == nil then
return nil, CallError:new("Failed to get all replicasets: %s", err.err)
return nil, CallError:new("Failed to get router replicasets: %s", err.err)
end
local replicaset = select(2, next(replicasets))

local res, err = replicaset:call(func_name, func_args, {
timeout = timeout,
})
if err ~= nil then
return nil, wrap_vshard_err(err, func_name, replicaset.uuid)
return nil, wrap_vshard_err(vshard_router, err, func_name, replicaset.uuid)
end

if res == box.NULL then
Expand Down
Loading