Skip to content

Commit

Permalink
storage: allow to choose rebalancer location
Browse files Browse the repository at this point in the history
Currently people struggle to find the rebalancer location because
it is unclear where it is spawned automatically (on the master
with minimal uuid).

The patch allows to choose in the configuration where the
rebalancer should work and whether it should work at all.

Closes #278

@TarantoolBot document
Title: vshard: cfg to choose rebalancer mode and location

There are 2 new config options:

- Root option 'rebalancer_mode'. It can have one of the following
  values:
  - 'auto' - works like now. The rebalancer is selected as the
  master with the minimal UUID;
  - 'off' - rebalancer won't be active at all;
  - 'manual' - need to manually mark the node where the rebalancer
  should be spawned. The selection happens in the config.

  Default is 'auto'.

- Instance option 'rebalancer'. It can be true or false. At most
  one instance in the config can have it set to true. If the
  rebalancer mode is 'manual' and none of the instances have this
  flag true, then the rebalancer won't be started anywhere. This
  option is ignored when the mode is not 'manual'. Default is
  false.
  • Loading branch information
Gerold103 committed Aug 18, 2023
1 parent b3c27b3 commit 7af20eb
Show file tree
Hide file tree
Showing 5 changed files with 284 additions and 9 deletions.
182 changes: 182 additions & 0 deletions test/storage-luatest/storage_2_2_test.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
local t = require('luatest')
local vtest = require('test.luatest_helpers.vtest')
local vutil = require('vshard.util')

local group_config = {{engine = 'memtx'}, {engine = 'vinyl'}}

if vutil.feature.memtx_mvcc then
table.insert(group_config, {
engine = 'memtx', memtx_use_mvcc_engine = true
})
table.insert(group_config, {
engine = 'vinyl', memtx_use_mvcc_engine = true
})
end

local test_group = t.group('storage', group_config)

local cfg_template = {
sharding = {
{
replicas = {
replica_1_a = {
master = true,
},
replica_1_b = {},
},
},
{
replicas = {
replica_2_a = {
master = true,
},
replica_2_b = {},
},
},
},
bucket_count = 16,
rebalancer_mode = 'off',
}
local global_cfg

test_group.before_all(function(g)
cfg_template.memtx_use_mvcc_engine = g.params.memtx_use_mvcc_engine
global_cfg = vtest.config_new(cfg_template)

vtest.cluster_new(g, global_cfg)
vtest.cluster_bootstrap(g, global_cfg)
end)

test_group.after_all(function(g)
g.cluster:drop()
end)

local function find_rebalancer_name(g)
local map, err = vtest.cluster_exec_each(g, function()
return ivshard.storage.internal.rebalancer_fiber ~= nil
end)
t.assert_equals(err, nil)
local rebalancer_name
for name, has_rebalancer in pairs(map) do
if has_rebalancer then
t.assert_equals(rebalancer_name, nil)
rebalancer_name = name
end
end
return rebalancer_name
end

test_group.test_rebalancer_mode_off_no_marks = function(g)
local _, err = vtest.cluster_exec_each(g, function()
ilt.assert(not ivshard.storage.internal.rebalancer_fiber)
end)
t.assert_equals(err, nil)
end

test_group.test_rebalancer_mode_off_ignore_marks = function(g)
local new_cfg_template = table.deepcopy(cfg_template)
new_cfg_template.sharding[1].replicas.replica_1_a.rebalancer = true
local new_cfg = vtest.config_new(new_cfg_template)
vtest.cluster_cfg(g, new_cfg)
local _, err = vtest.cluster_exec_each(g, function()
ilt.assert(not ivshard.storage.internal.rebalancer_fiber)
end)
t.assert_equals(err, nil)
-- Revert.
vtest.cluster_cfg(g, global_cfg)
end

test_group.test_rebalancer_mode_auto_ignore_marks = function(g)
local new_cfg_template = table.deepcopy(cfg_template)
new_cfg_template.rebalancer_mode = 'auto'
local new_cfg = vtest.config_new(new_cfg_template)
local map, err = vtest.cluster_exec_each(g, function(cfg)
-- Tell each instance it should be the rebalancer. But it must be
-- ignored when mode = 'auto'.
local info = box.info
cfg.sharding[ivutil.replicaset_uuid(info)]
.replicas[info.uuid].rebalancer = true
local _, err = ivshard.storage.cfg(cfg, box.info.uuid)
ilt.assert_equals(err, nil)
return {
has_rebalancer = ivshard.storage.internal.rebalancer_fiber ~= nil,
uuid = box.info.uuid,
}
end, {new_cfg})
t.assert_equals(err, nil)
local min_uuid
local rebalancer_uuid
for _, res in pairs(map) do
if not min_uuid or min_uuid > res.uuid then
min_uuid = res.uuid
end
if res.has_rebalancer then
t.assert_equals(rebalancer_uuid, nil)
rebalancer_uuid = res.uuid
end
end
t.assert_equals(min_uuid, rebalancer_uuid)
-- Revert.
vtest.cluster_cfg(g, global_cfg)
end

test_group.test_rebalancer_mode_manual_no_marks = function(g)
local new_cfg_template = table.deepcopy(cfg_template)
new_cfg_template.rebalancer_mode = 'manual'
local new_cfg = vtest.config_new(new_cfg_template)
vtest.cluster_cfg(g, new_cfg)
local _, err = vtest.cluster_exec_each(g, function()
ilt.assert(not ivshard.storage.internal.rebalancer_fiber)
end)
t.assert_equals(err, nil)
-- Revert.
vtest.cluster_cfg(g, global_cfg)
end

test_group.test_rebalancer_mode_manual_with_mark = function(g)
local new_cfg_template = table.deepcopy(cfg_template)
new_cfg_template.rebalancer_mode = 'manual'

-- Try a master.
new_cfg_template.sharding[1].replicas.replica_1_a.rebalancer = true
local new_cfg = vtest.config_new(new_cfg_template)
vtest.cluster_cfg(g, new_cfg)
t.assert_equals(find_rebalancer_name(g), 'replica_1_a')

-- Try a replica.
new_cfg_template.sharding[1].replicas.replica_1_a.rebalancer = nil
new_cfg_template.sharding[2].replicas.replica_2_b.rebalancer = true
new_cfg = vtest.config_new(new_cfg_template)
vtest.cluster_cfg(g, new_cfg)
t.assert_equals(find_rebalancer_name(g), 'replica_2_b')

-- Ensure it works even on the replica.
vtest.cluster_rebalancer_disable(g)
g.replica_1_a:exec(function(rs2_uuid)
while true do
local bid = _G.get_first_bucket()
if not bid then
break
end
local _, err = ivshard.storage.bucket_send(
bid, rs2_uuid, {timeout = iwait_timeout})
ilt.assert_equals(err, nil)
end
end, {g.replica_2_a:replicaset_uuid()})
vtest.cluster_rebalancer_enable(g)

g.replica_2_b:exec(function()
ivtest.service_wait_for_new_ok(
ivshard.storage.internal.rebalancer_service, {
on_yield = ivshard.storage.rebalancer_wakeup
})
end)
local _, err = vtest.cluster_exec_each_master(g, function(bucket_count)
_G.bucket_gc_wait()
ilt.assert_equals(ivshard.storage.buckets_count(), bucket_count)
end, {new_cfg_template.bucket_count / #new_cfg_template.sharding})
t.assert_equals(err, nil)

-- Revert.
vtest.cluster_cfg(g, global_cfg)
end
46 changes: 46 additions & 0 deletions test/unit-luatest/config_test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,49 @@ g.test_replica_listen = function()
t.assert_equals(rep_1_a.uri, url1, 'uri value')
t.assert_equals(rep_1_a.listen, {url1, url2}, 'listen value')
end

g.test_rebalancer_mode = function()
local config = {
sharding = {
storage_1_uuid = {
replicas = {
storage_1_a_uuid = {
uri = 'storage:storage@127.0.0.1:3301',
name = 'storage_1_a',
},
},
},
storage_2_uuid = {
replicas = {
storage_2_a_uuid = {
uri = 'storage:storage@127.0.0.1:3302',
name = 'storage_2_a',
},
}
}
},
rebalancer_mode = 'off',
}
t.assert(vcfg.check(config), 'mode = off')
local sharding = config.sharding
local storage_1_a = sharding.storage_1_uuid.replicas.storage_1_a_uuid
storage_1_a.rebalancer = true
t.assert(vcfg.check(config), 'mode = off with one marked')
storage_1_a.rebalancer = nil

config.rebalancer_mode = 'auto'
t.assert(vcfg.check(config), 'mode = auto')
storage_1_a.rebalancer = true
t.assert(vcfg.check(config), 'mode = auto with one marked')
storage_1_a.rebalancer = nil

config.rebalancer_mode = 'manual'
t.assert(vcfg.check(config), 'mode = manual')
storage_1_a.rebalancer = true
t.assert(vcfg.check(config), 'mode = manual with one marked')

local storage_2_a = sharding.storage_2_uuid.replicas.storage_2_a_uuid
storage_2_a.rebalancer = true
t.assert_error_msg_contains('More than one rebalancer is found',
vcfg.check, config)
end
4 changes: 2 additions & 2 deletions test/unit/config.result
Original file line number Diff line number Diff line change
Expand Up @@ -462,9 +462,9 @@ lcfg.check(cfg)['sharding']
- replicaset_uuid:
replicas:
replica_uuid:
master: true
uri: 127.0.0.1
name: storage
master: true
weight: 100000
...
replica.uri = 'user:password@localhost'
Expand All @@ -475,9 +475,9 @@ lcfg.check(cfg)['sharding']
- replicaset_uuid:
replicas:
replica_uuid:
master: true
uri: user:password@localhost
name: storage
master: true
weight: 100000
...
replica.url = old_uri
Expand Down
20 changes: 20 additions & 0 deletions vshard/cfg.lua
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ local function check_replicaset_master(master)
end
end

local function cfg_check_rebalancer_mode(mode)
if mode ~= 'auto' and mode ~= 'manual' and mode ~= 'off' then
error('Only "auto", "manual", and "off" are supported')
end
end

local function is_number(v)
return type(v) == 'number' and v == v
end
Expand Down Expand Up @@ -148,6 +154,7 @@ local replica_template = {
type = 'boolean', name = "Master", is_optional = true,
check = check_replica_master
},
rebalancer = {type = 'boolean', name = 'Rebalancer', is_optional = true},
}

local function check_replicas(replicas)
Expand Down Expand Up @@ -209,6 +216,7 @@ local function check_sharding(sharding)
local uuids = {}
local uris = {}
local names = {}
local rebalancer_uuid
local is_all_weights_zero = true
for replicaset_uuid, replicaset in pairs(sharding) do
if uuids[replicaset_uuid] then
Expand Down Expand Up @@ -252,6 +260,14 @@ local function check_sharding(sharding)
names[name] = 2
end
end
if replica.rebalancer then
if rebalancer_uuid then
error(string.format('More than one rebalancer is found: '..
'on replicas %s and %s', replica_uuid,
rebalancer_uuid))
end
rebalancer_uuid = replica_uuid
end
end
is_all_weights_zero = is_all_weights_zero and replicaset.weight == 0
end
Expand All @@ -278,6 +294,10 @@ local cfg_template = {
type = 'positive integer', name = 'Bucket count', is_optional = true,
default = consts.DEFAULT_BUCKET_COUNT
},
rebalancer_mode = {
type = 'string', name = 'Rebalancer mode', is_optional = true,
default = 'auto', check = cfg_check_rebalancer_mode,
},
rebalancer_disbalance_threshold = {
type = 'non-negative number', name = 'Rebalancer disbalance threshold',
is_optional = true,
Expand Down
41 changes: 34 additions & 7 deletions vshard/storage/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3315,6 +3315,32 @@ end
-- Configuration
--------------------------------------------------------------------------------

local function storage_cfg_find_rebalancer(cfg)
local mode = cfg.rebalancer_mode
if mode == 'off' then
return
end
local min_master_uuid
local rebalancer_uuid
for _, rs in pairs(cfg.sharding) do
for replica_uuid, replica in pairs(rs.replicas) do
if replica.rebalancer then
assert(not rebalancer_uuid)
rebalancer_uuid = replica_uuid
end
if replica.master and (not min_master_uuid or
replica_uuid < min_master_uuid) then
min_master_uuid = replica_uuid
end
end
end
if mode == 'auto' then
return min_master_uuid
end
assert(mode == 'manual')
return rebalancer_uuid
end

local function storage_cfg(cfg, this_replica_uuid, is_reload)
if this_replica_uuid == nil then
error('Usage: cfg(configuration, this_replica_uuid)')
Expand All @@ -3337,13 +3363,8 @@ local function storage_cfg(cfg, this_replica_uuid, is_reload)
local this_replicaset
local this_replica
local new_replicasets = lreplicaset.buildall(vshard_cfg)
local min_master
for _, rs in pairs(new_replicasets) do
for replica_uuid, replica in pairs(rs.replicas) do
if (min_master == nil or replica_uuid < min_master.uuid) and
rs.master == replica then
min_master = replica
end
if replica_uuid == this_replica_uuid then
assert(this_replicaset == nil)
this_replicaset = rs
Expand Down Expand Up @@ -3481,7 +3502,8 @@ local function storage_cfg(cfg, this_replica_uuid, is_reload)
local_on_master_enable()
end

if min_master == this_replica then
local rebalancer_uuid = storage_cfg_find_rebalancer(vshard_cfg)
if rebalancer_uuid == this_replica.uuid then
if not M.rebalancer_fiber then
M.rebalancer_fiber =
util.reloadable_fiber_create('vshard.rebalancer', M,
Expand All @@ -3492,7 +3514,12 @@ local function storage_cfg(cfg, this_replica_uuid, is_reload)
M.rebalancer_fiber:wakeup()
end
elseif M.rebalancer_fiber then
log.info('Rebalancer location has changed to %s', min_master)
if rebalancer_uuid then
log.info('Rebalancer location has changed to instance %s',
rebalancer_uuid)
else
log.info('Rebalancer is turned off')
end
M.rebalancer_fiber:cancel()
M.rebalancer_fiber = nil
end
Expand Down

0 comments on commit 7af20eb

Please sign in to comment.