diff --git a/test/storage-luatest/storage_2_2_test.lua b/test/storage-luatest/storage_2_2_test.lua new file mode 100644 index 00000000..c33955f7 --- /dev/null +++ b/test/storage-luatest/storage_2_2_test.lua @@ -0,0 +1,182 @@ +local t = require('luatest') +local vtest = require('test.luatest_helpers.vtest') +local vutil = require('vshard.util') + +local group_config = {{engine = 'memtx'}, {engine = 'vinyl'}} + +if vutil.feature.memtx_mvcc then + table.insert(group_config, { + engine = 'memtx', memtx_use_mvcc_engine = true + }) + table.insert(group_config, { + engine = 'vinyl', memtx_use_mvcc_engine = true + }) +end + +local test_group = t.group('storage', group_config) + +local cfg_template = { + sharding = { + { + replicas = { + replica_1_a = { + master = true, + }, + replica_1_b = {}, + }, + }, + { + replicas = { + replica_2_a = { + master = true, + }, + replica_2_b = {}, + }, + }, + }, + bucket_count = 16, + rebalancer_mode = 'off', +} +local global_cfg + +test_group.before_all(function(g) + cfg_template.memtx_use_mvcc_engine = g.params.memtx_use_mvcc_engine + global_cfg = vtest.config_new(cfg_template) + + vtest.cluster_new(g, global_cfg) + vtest.cluster_bootstrap(g, global_cfg) +end) + +test_group.after_all(function(g) + g.cluster:drop() +end) + +local function find_rebalancer_name(g) + local map, err = vtest.cluster_exec_each(g, function() + return ivshard.storage.internal.rebalancer_fiber ~= nil + end) + t.assert_equals(err, nil) + local rebalancer_name + for name, has_rebalancer in pairs(map) do + if has_rebalancer then + t.assert_equals(rebalancer_name, nil) + rebalancer_name = name + end + end + return rebalancer_name +end + +test_group.test_rebalancer_mode_off_no_marks = function(g) + local _, err = vtest.cluster_exec_each(g, function() + ilt.assert(not ivshard.storage.internal.rebalancer_fiber) + end) + t.assert_equals(err, nil) +end + +test_group.test_rebalancer_mode_off_ignore_marks = function(g) + local new_cfg_template = table.deepcopy(cfg_template) + new_cfg_template.sharding[1].replicas.replica_1_a.rebalancer = true + local new_cfg = vtest.config_new(new_cfg_template) + vtest.cluster_cfg(g, new_cfg) + local _, err = vtest.cluster_exec_each(g, function() + ilt.assert(not ivshard.storage.internal.rebalancer_fiber) + end) + t.assert_equals(err, nil) + -- Revert. + vtest.cluster_cfg(g, global_cfg) +end + +test_group.test_rebalancer_mode_auto_ignore_marks = function(g) + local new_cfg_template = table.deepcopy(cfg_template) + new_cfg_template.rebalancer_mode = 'auto' + local new_cfg = vtest.config_new(new_cfg_template) + local map, err = vtest.cluster_exec_each(g, function(cfg) + -- Tell each instance it should be the rebalancer. But it must be + -- ignored when mode = 'auto'. + local info = box.info + cfg.sharding[ivutil.replicaset_uuid(info)] + .replicas[info.uuid].rebalancer = true + local _, err = ivshard.storage.cfg(cfg, box.info.uuid) + ilt.assert_equals(err, nil) + return { + has_rebalancer = ivshard.storage.internal.rebalancer_fiber ~= nil, + uuid = box.info.uuid, + } + end, {new_cfg}) + t.assert_equals(err, nil) + local min_uuid + local rebalancer_uuid + for _, res in pairs(map) do + if not min_uuid or min_uuid > res.uuid then + min_uuid = res.uuid + end + if res.has_rebalancer then + t.assert_equals(rebalancer_uuid, nil) + rebalancer_uuid = res.uuid + end + end + t.assert_equals(min_uuid, rebalancer_uuid) + -- Revert. + vtest.cluster_cfg(g, global_cfg) +end + +test_group.test_rebalancer_mode_manual_no_marks = function(g) + local new_cfg_template = table.deepcopy(cfg_template) + new_cfg_template.rebalancer_mode = 'manual' + local new_cfg = vtest.config_new(new_cfg_template) + vtest.cluster_cfg(g, new_cfg) + local _, err = vtest.cluster_exec_each(g, function() + ilt.assert(not ivshard.storage.internal.rebalancer_fiber) + end) + t.assert_equals(err, nil) + -- Revert. + vtest.cluster_cfg(g, global_cfg) +end + +test_group.test_rebalancer_mode_manual_with_mark = function(g) + local new_cfg_template = table.deepcopy(cfg_template) + new_cfg_template.rebalancer_mode = 'manual' + + -- Try a master. + new_cfg_template.sharding[1].replicas.replica_1_a.rebalancer = true + local new_cfg = vtest.config_new(new_cfg_template) + vtest.cluster_cfg(g, new_cfg) + t.assert_equals(find_rebalancer_name(g), 'replica_1_a') + + -- Try a replica. + new_cfg_template.sharding[1].replicas.replica_1_a.rebalancer = nil + new_cfg_template.sharding[2].replicas.replica_2_b.rebalancer = true + new_cfg = vtest.config_new(new_cfg_template) + vtest.cluster_cfg(g, new_cfg) + t.assert_equals(find_rebalancer_name(g), 'replica_2_b') + + -- Ensure it works even on the replica. + vtest.cluster_rebalancer_disable(g) + g.replica_1_a:exec(function(rs2_uuid) + while true do + local bid = _G.get_first_bucket() + if not bid then + break + end + local _, err = ivshard.storage.bucket_send( + bid, rs2_uuid, {timeout = iwait_timeout}) + ilt.assert_equals(err, nil) + end + end, {g.replica_2_a:replicaset_uuid()}) + vtest.cluster_rebalancer_enable(g) + + g.replica_2_b:exec(function() + ivtest.service_wait_for_new_ok( + ivshard.storage.internal.rebalancer_service, { + on_yield = ivshard.storage.rebalancer_wakeup + }) + end) + local _, err = vtest.cluster_exec_each_master(g, function(bucket_count) + _G.bucket_gc_wait() + ilt.assert_equals(ivshard.storage.buckets_count(), bucket_count) + end, {new_cfg_template.bucket_count / #new_cfg_template.sharding}) + t.assert_equals(err, nil) + + -- Revert. + vtest.cluster_cfg(g, global_cfg) +end diff --git a/test/unit-luatest/config_test.lua b/test/unit-luatest/config_test.lua index 741847d9..29737b6c 100644 --- a/test/unit-luatest/config_test.lua +++ b/test/unit-luatest/config_test.lua @@ -91,3 +91,49 @@ g.test_replica_listen = function() t.assert_equals(rep_1_a.uri, url1, 'uri value') t.assert_equals(rep_1_a.listen, {url1, url2}, 'listen value') end + +g.test_rebalancer_mode = function() + local config = { + sharding = { + storage_1_uuid = { + replicas = { + storage_1_a_uuid = { + uri = 'storage:storage@127.0.0.1:3301', + name = 'storage_1_a', + }, + }, + }, + storage_2_uuid = { + replicas = { + storage_2_a_uuid = { + uri = 'storage:storage@127.0.0.1:3302', + name = 'storage_2_a', + }, + } + } + }, + rebalancer_mode = 'off', + } + t.assert(vcfg.check(config), 'mode = off') + local sharding = config.sharding + local storage_1_a = sharding.storage_1_uuid.replicas.storage_1_a_uuid + storage_1_a.rebalancer = true + t.assert(vcfg.check(config), 'mode = off with one marked') + storage_1_a.rebalancer = nil + + config.rebalancer_mode = 'auto' + t.assert(vcfg.check(config), 'mode = auto') + storage_1_a.rebalancer = true + t.assert(vcfg.check(config), 'mode = auto with one marked') + storage_1_a.rebalancer = nil + + config.rebalancer_mode = 'manual' + t.assert(vcfg.check(config), 'mode = manual') + storage_1_a.rebalancer = true + t.assert(vcfg.check(config), 'mode = manual with one marked') + + local storage_2_a = sharding.storage_2_uuid.replicas.storage_2_a_uuid + storage_2_a.rebalancer = true + t.assert_error_msg_contains('More than one rebalancer is found', + vcfg.check, config) +end diff --git a/test/unit/config.result b/test/unit/config.result index ff19289f..4fb925f5 100644 --- a/test/unit/config.result +++ b/test/unit/config.result @@ -462,9 +462,9 @@ lcfg.check(cfg)['sharding'] - replicaset_uuid: replicas: replica_uuid: - master: true uri: 127.0.0.1 name: storage + master: true weight: 100000 ... replica.uri = 'user:password@localhost' @@ -475,9 +475,9 @@ lcfg.check(cfg)['sharding'] - replicaset_uuid: replicas: replica_uuid: - master: true uri: user:password@localhost name: storage + master: true weight: 100000 ... replica.url = old_uri diff --git a/vshard/cfg.lua b/vshard/cfg.lua index e0445e3b..4fbfeeb9 100644 --- a/vshard/cfg.lua +++ b/vshard/cfg.lua @@ -44,6 +44,12 @@ local function check_replicaset_master(master) end end +local function cfg_check_rebalancer_mode(mode) + if mode ~= 'auto' and mode ~= 'manual' and mode ~= 'off' then + error('Only "auto", "manual", and "off" are supported') + end +end + local function is_number(v) return type(v) == 'number' and v == v end @@ -148,6 +154,7 @@ local replica_template = { type = 'boolean', name = "Master", is_optional = true, check = check_replica_master }, + rebalancer = {type = 'boolean', name = 'Rebalancer', is_optional = true}, } local function check_replicas(replicas) @@ -209,6 +216,7 @@ local function check_sharding(sharding) local uuids = {} local uris = {} local names = {} + local rebalancer_uuid local is_all_weights_zero = true for replicaset_uuid, replicaset in pairs(sharding) do if uuids[replicaset_uuid] then @@ -252,6 +260,14 @@ local function check_sharding(sharding) names[name] = 2 end end + if replica.rebalancer then + if rebalancer_uuid then + error(string.format('More than one rebalancer is found: '.. + 'on replicas %s and %s', replica_uuid, + rebalancer_uuid)) + end + rebalancer_uuid = replica_uuid + end end is_all_weights_zero = is_all_weights_zero and replicaset.weight == 0 end @@ -278,6 +294,10 @@ local cfg_template = { type = 'positive integer', name = 'Bucket count', is_optional = true, default = consts.DEFAULT_BUCKET_COUNT }, + rebalancer_mode = { + type = 'string', name = 'Rebalancer mode', is_optional = true, + default = 'auto', check = cfg_check_rebalancer_mode, + }, rebalancer_disbalance_threshold = { type = 'non-negative number', name = 'Rebalancer disbalance threshold', is_optional = true, diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua index 2213dc55..5f344108 100644 --- a/vshard/storage/init.lua +++ b/vshard/storage/init.lua @@ -3315,6 +3315,32 @@ end -- Configuration -------------------------------------------------------------------------------- +local function storage_cfg_find_rebalancer(cfg) + local mode = cfg.rebalancer_mode + if mode == 'off' then + return + end + local min_master_uuid + local rebalancer_uuid + for _, rs in pairs(cfg.sharding) do + for replica_uuid, replica in pairs(rs.replicas) do + if replica.rebalancer then + assert(not rebalancer_uuid) + rebalancer_uuid = replica_uuid + end + if replica.master and (not min_master_uuid or + replica_uuid < min_master_uuid) then + min_master_uuid = replica_uuid + end + end + end + if mode == 'auto' then + return min_master_uuid + end + assert(mode == 'manual') + return rebalancer_uuid +end + local function storage_cfg(cfg, this_replica_uuid, is_reload) if this_replica_uuid == nil then error('Usage: cfg(configuration, this_replica_uuid)') @@ -3337,13 +3363,8 @@ local function storage_cfg(cfg, this_replica_uuid, is_reload) local this_replicaset local this_replica local new_replicasets = lreplicaset.buildall(vshard_cfg) - local min_master for _, rs in pairs(new_replicasets) do for replica_uuid, replica in pairs(rs.replicas) do - if (min_master == nil or replica_uuid < min_master.uuid) and - rs.master == replica then - min_master = replica - end if replica_uuid == this_replica_uuid then assert(this_replicaset == nil) this_replicaset = rs @@ -3481,7 +3502,8 @@ local function storage_cfg(cfg, this_replica_uuid, is_reload) local_on_master_enable() end - if min_master == this_replica then + local rebalancer_uuid = storage_cfg_find_rebalancer(vshard_cfg) + if rebalancer_uuid == this_replica.uuid then if not M.rebalancer_fiber then M.rebalancer_fiber = util.reloadable_fiber_create('vshard.rebalancer', M, @@ -3492,7 +3514,12 @@ local function storage_cfg(cfg, this_replica_uuid, is_reload) M.rebalancer_fiber:wakeup() end elseif M.rebalancer_fiber then - log.info('Rebalancer location has changed to %s', min_master) + if rebalancer_uuid then + log.info('Rebalancer location has changed to instance %s', + rebalancer_uuid) + else + log.info('Rebalancer is turned off') + end M.rebalancer_fiber:cancel() M.rebalancer_fiber = nil end