Please note, this project is deprecated and no longer being maintained, please use vshard.
An application-level library that provides sharding and client-side reliable replication for tarantool. Implements a single-phase and two-phase protocol operations (with batching support), monitors availability of nodes and automatically expells failed nodes from the cluster.
To shard data across nodes, a variant of consistent hashing is used. The shard key is determined automatically based on sharded space description.
- Add tarantool repository for yum or apt
- Install
$sudo [yum|apt-get] install tarantool tarantool-shard tarantool-pool
- redundancy - the redundancy factor. How many copies of each tuple to maintain in the cluster
- zone - a redundancy zone. May represent a single machine or a single data center. The number of zones must be greater or equal to the redundancy factor: duplicating data in the same zone doesn't increase availability
This example starts a tarantool instance that connects to itself, creating a sharding configuration with a single zone and a single server.
If you need more servers, add entries to the servers
part of the
configuration. See "Configuration" below for details.
local shard = require('shard')
local json = require('json')
-- tarantool configuration
box.cfg {
wal_mode = 'none',
listen = 33021
}
box.schema.create_space('demo', {if_not_exists = true})
box.space.demo:create_index('primary',
{parts={1, 'unsigned'}, if_not_exists=true})
box.schema.user.grant('guest', 'read,write,execute',
'universe', nil, {if_not_exists = true})
box.schema.user.grant('guest', 'replication',
nil, nil, {if_not_exists = true})
-- sharding configuration
shard.init {
servers = {
{ uri = 'localhost:33021', zone = '0' },
},
login = 'guest',
password = '',
redundancy = 1
}
shard.demo:insert({1, 'test'})
shard.demo:replace({1, 'test2'})
shard.demo:update({1}, {{'=', 2, 'test3'}})
shard.demo:insert({2, 'test4'})
shard.demo:insert({3, 'test5'})
shard.demo:delete({3})
print(json.encode(shard.demo:select({1})))
print(json.encode(shard.demo:select({2})))
Sharding module can be tested with tarantool functional testing framework:
pip install -r test-run/requirements.txt
python test/test-run.py
cfg = {
servers = {
{ uri = 'localhost:33130', zone = '0' },
{ uri = 'localhost:33131', zone = '1' },
{ uri = 'localhost:33132', zone = '2' }
},
login = 'tester',
password = 'pass',
monitor = true,
pool_name = "default",
redundancy = 3,
rsd_max_rps = 1000,
replication = true
}
Where:
servers
: a list of dictionaris {uri = '', zone = ''} that describe individual servers in the sharding configurationlogin
andpassword
: credentials that will be used to connect toservers
monitor
: whether to do active checks on the servers and remove them from sharding if they become unreachable (defaulttrue
)pool_name
: display name of the connection pool created for the group ofservers
. This only matters if you use connpool module in parallel to the sharding module for other purposes. Otherwise you may skip this option. (default'default'
)redundancy
: How many copies of each tuple to maintain in the cluster. (defaults to number of zones)replication
: Set totrue
if redundancy is handled by replication (default isfalse
)
Timeout options are global, and can be set before calling the init()
funciton, like this:
shard = require 'shard'
local cfg = {...}
shard.REMOTE_TIMEOUT = 210
shard.HEARTBEAT_TIMEOUT = 500
shard.DEAD_TIMEOUT = 10
shard.RECONNECT_AFTER = 30
shard.init(cfg)
Where:
REMOTE_TIMEOUT
is a timeout in seconds for data access operations, like insert/update/delete. (default is210
)HEARTBEAT_TIMEOUT
is a timeout in seconds before a heartbeat call will fail. (default is500
)DEAD_TIMEOUT
is a timeout in seconds after which the non-responding node will be expelled from the cluster (default is 10)RECONNECT_AFTER
allows you to ignore transient failures in remote operations. Terminated connections will be re-established after a specified timeout in seconds. Under the hood, it uses thereconnect_after
option fornet.box
. (disabled by default, i.e.msgpack.NULL
)
Initialize sharding module, connect to all nodes and start monitoring them.
- cfg - sharding configuration (see Configuration above)
Note, that sharding configuration can be changed dynamically, and it
is your job to make sure that the changes get reflected in this
configuration. Because when you restart your cluster, the topology
will be read from whatever you pass to init()
.
Returns status of the cluster from the point of view of each node.
Example output:
---
- localhost:3302:
localhost:3302: {'try': 0, 'ts': 1499270503.9233}
localhost:3301: {'try': 0, 'ts': 1499270507.0284}
localhost:3301:
localhost:3302: {'try': 0, 'ts': 1499270504.9097}
localhost:3301: {'try': 0, 'ts': 1499270506.8166}
...
Returns true
if the heartbeat table contains data about each node,
from the point of view of each other node. If the sharding module
hasn't yet filled in the heartbeats, or there are dead nodes, this
function will return false
.
Returns true
if all shards are connected and operational.
Wait until all shards are connected and operational.
Returns the status of all shards: whether they are online, offline or in maintenance.
Example output:
---
- maintenance: []
offline: []
online:
- uri: localhost:3301
id: 1
- uri: localhost:3302
id: 2
- uri: localhost:3303
id: 3
...
Appends a pair of redundant instances to the cluster, and initiates resharding.
servers
- table of servers in the same format as in config
This function should be called on one node and will propagate changes everywhere.
Example:
remote_append({{uri="localhost:3305", zone='2'},
{uri="localhost:3306", zone='2'}})
Returns: true
on success
If the node got expelled from the cluster, you may bring it back by
using remote_join()
. It will reconnect to the node and allow write
access to it.
There are 2 reasons why it may happen: either the node has died, or
you've called remote_unjoin()
on it.
Example:
remote_join(2)
Returns: true
on success
Put the node identified by id
to maintenance mode. It will not
receive writes, and will not be returned by the shard()
function.
Inserts tuple
to the shard space.
tuple[1]
is treated as shard key.
Returns: table with results of individual insert()
calls on each
redundant node.
Replaces tuple
across the shard space.
tuple[1]
is treated as shard key.
Returns: table with results of individual replace()
calls on each
redundant node.
Deletes tuples with primary key key
across the shard space.
key[1]
is treated as shard key.
Returns: table with results of individual delete()
calls on each
redundant node.
Update tuple
across the shard space. Behaves the same way as Tarantool's update().
key[1]
is treated as shard key.
Returns: table with results of individual update()
calls on each
redundant node.
Inserts tuple
to the shard space, automatically incrementing its primary key.
If primary key is numeric, auto_increment()
will use the next integer number.
If primary key is string, auto_increment()
will generate a new UUID.
Shard key is determined from the space schema, unlike the insert()
operation.
Returns: table with results of individual auto_increment()
calls on
each redundant node. Return value of each auto_increment()
is the
same as in the insert()
call.
Two phase operations work, well, in two phases. The first phase pushes the operation into an auxiliary space "operations" on all the involved shards, according to the redundancy factor. As soon as the operation is propagated to the shards, a separate call triggers execution of the operation on all shards. If the caller dies before invoking the second phase, the shards figure out by themselves that the operation has been propagated and execute it anyway (it only takes a while, since the check is done only once in a period of time). The operation id is necessary to avoid double execution of the same operation (at most once execution semantics) and most be provided by the user. The status of the operation can always be checked, given its operation id, and provided that it has not been pruned from operations space.
Inserts tuple
to the shard space.
operation_id
is a unique operation identifier (see "Tho-phase operations")tuple[1]
is treated as shard key.
Returns: tuple
Replaces tuple
across the shard space.
operation_id
is a unique operation identifier (see "Tho-phase operations")tuple[1]
is treated as shard key.
Returns: tuple
Deletes tuples with primary key key
across the shard space.
operation_id
is a unique operation identifier (see "Tho-phase operations")key
is treated as a shard key.
Returns: nothing
Update tuple
across the shard space. Behaves the same way as Tarantool's update().
operation_id
is a unique operation identifier (see "Tho-phase operations")key
is treated as shard key.
Returns: nothing
Inserts tuple
to the shard space, automatically incrementing its primary key.
operation_id
is a unique operation identifier (see "Tho-phase operations")
If primary key is numeric, auto_increment()
will use the next integer number.
If primary key is string, auto_increment()
will generate a new UUID.
Shard key is determined from the space schema, unlike the insert()
operation.
Returns: tuple
Function checks the operation status on all nodes. If the operation hasn't finished yet - waits for its completion for up to 5 seconds.
operation_id
- unique operation identifiertuple_id
- tuple primary key
Returns: true
, if the operation has completed, false
otherwise.
q_begin()
returns an object that wraps multiple sequential two-phase
operations into one batch. You can use it the same way you use the
shard object:
batch_obj = shard.q_begin()
batch_obj.demo:q_insert(1, {0, 'test'})
batch_obj.demo:q_replace(2, {0, 'test2'})
batch_obj:q_end()
When you call q_end()
, the batch will be executed in one shot.
If there are pending two-phase operations, wait until they complete.