From 0e2c511e6a8ba8542cf02a2ee0d9931c3f11477b Mon Sep 17 00:00:00 2001 From: Vladislav Shpilevoy Date: Thu, 28 Sep 2023 19:39:59 +0200 Subject: [PATCH] rebalancer: introduce rebalancer flag The flags allows to explicitly assign the rebalancer role to a specific instance or a replicaset. Or forbid its automatic assignment to instances or whole replicasets. Part of #432 NO_DOC=in the final commit with rebalancer options --- test/storage-luatest/rebalancer_test.lua | 76 ++++++++++++++++++++++ test/unit-luatest/config_test.lua | 83 ++++++++++++++++++++++++ test/unit/config.result | 8 +-- vshard/cfg.lua | 23 +++++++ vshard/storage/init.lua | 34 +++++++++- 5 files changed, 219 insertions(+), 5 deletions(-) diff --git a/test/storage-luatest/rebalancer_test.lua b/test/storage-luatest/rebalancer_test.lua index bc817dd6..08353a81 100644 --- a/test/storage-luatest/rebalancer_test.lua +++ b/test/storage-luatest/rebalancer_test.lua @@ -157,3 +157,79 @@ test_group.test_rebalancer_location = function(g) vtest.cluster_cfg(g, global_cfg) wait_rebalancer_on_instance(g, 'replica_1_a') end + +test_group.test_locate_with_flag = function(g) + t.assert_equals(vtest.cluster_rebalancer_find(g), 'replica_1_a') + -- + -- Assign to another replicaset, with a non-minimal UUID. + -- + local new_cfg_template = table.deepcopy(cfg_template) + new_cfg_template.sharding[1].rebalancer = false + new_cfg_template.sharding[2].rebalancer = true + local new_global_cfg = vtest.config_new(new_cfg_template) + vtest.cluster_cfg(g, new_global_cfg) + wait_rebalancer_on_instance(g, 'replica_2_a') + -- + -- Automatically move the rebalancer together with the master role. + -- + new_cfg_template.sharding[2].replicas.replica_2_a.read_only = true + new_cfg_template.sharding[2].replicas.replica_2_b.read_only = false + new_global_cfg = vtest.config_new(new_cfg_template) + vtest.cluster_cfg(g, new_global_cfg) + wait_rebalancer_on_instance(g, 'replica_2_b') + -- + -- Assign to the replicaset with the maximal UUID. + -- + new_cfg_template.sharding[2].rebalancer = false + new_cfg_template.sharding[3].rebalancer = true + new_global_cfg = vtest.config_new(new_cfg_template) + vtest.cluster_cfg(g, new_global_cfg) + wait_rebalancer_on_instance(g, 'replica_3_a') + -- + -- Assign explicitly to a read-only replica. + -- + new_cfg_template.sharding[3].master = nil + new_cfg_template.sharding[3].replicas.replica_3_b.master = true + new_global_cfg = vtest.config_new(new_cfg_template) + vtest.cluster_cfg(g, new_global_cfg) + wait_rebalancer_on_instance(g, 'replica_3_b') + -- + -- Forbid the rebalancer on the min-UUID replicaset. Then the replicaset + -- with the next UUID is used. + -- + new_cfg_template = table.deepcopy(cfg_template) + new_cfg_template.sharding[1].rebalancer = false + new_global_cfg = vtest.config_new(new_cfg_template) + vtest.cluster_cfg(g, new_global_cfg) + wait_rebalancer_on_instance(g, 'replica_2_a') + -- + -- Forbid to run on the current master. But on a replica it won't run + -- without an explicit flag. Hence no rebalancer at all. + -- + new_cfg_template.sharding[2].replicas.replica_2_a.rebalancer = false + new_global_cfg = vtest.config_new(new_cfg_template) + vtest.cluster_cfg(g, new_global_cfg) + wait_rebalancer_on_instance(g) + -- + -- The master appears on another instance. The rebalancer can finally start. + -- + new_cfg_template.sharding[2].replicas.replica_2_b.read_only = false + new_global_cfg = vtest.config_new(new_cfg_template) + vtest.cluster_cfg(g, new_global_cfg) + wait_rebalancer_on_instance(g, 'replica_2_b') + -- + -- Explicitly assign to a replica in a non-min-UUID replicaset. Without + -- setting this flag for any replicaset. + -- + new_cfg_template = table.deepcopy(cfg_template) + new_cfg_template.sharding[1].rebalancer = nil + new_cfg_template.sharding[2].replicas.replica_2_b.rebalancer = true + new_global_cfg = vtest.config_new(new_cfg_template) + vtest.cluster_cfg(g, new_global_cfg) + wait_rebalancer_on_instance(g, 'replica_2_b') + -- + -- Cleanup. + -- + vtest.cluster_cfg(g, global_cfg) + wait_rebalancer_on_instance(g, 'replica_1_a') +end diff --git a/test/unit-luatest/config_test.lua b/test/unit-luatest/config_test.lua index 95790dd3..5c83d30b 100644 --- a/test/unit-luatest/config_test.lua +++ b/test/unit-luatest/config_test.lua @@ -128,3 +128,86 @@ g.test_extract_vshard = function() replication_timeout = 10, }) end + +g.test_rebalancer_flag = function() + local storage_1_a = { + uri = 'storage:storage@127.0.0.1:3301', + name = 'storage_1_a', + } + local replicaset_1 = { + replicas = { + storage_1_a_uuid = storage_1_a, + }, + } + local storage_2_a = { + uri = 'storage:storage@127.0.0.1:3302', + name = 'storage_2_a', + } + local replicaset_2 = { + replicas = { + storage_2_a_uuid = storage_2_a, + }, + } + local config = { + sharding = { + storage_1_uuid = replicaset_1, + storage_2_uuid = replicaset_2, + }, + } + t.assert(vcfg.check(config)) + -- + -- Bad replica-rebalancer flag. + -- + storage_1_a.rebalancer = 'test' + t.assert_error_msg_content_equals( + 'Rebalancer flag must be boolean', vcfg.check, config) + storage_1_a.rebalancer = nil + -- + -- Bad replicaset-rebalancer flag. + -- + replicaset_1.rebalancer = 'test' + t.assert_error_msg_content_equals( + 'Rebalancer flag must be boolean', vcfg.check, config) + replicaset_1.rebalancer = nil + -- + -- Rebalancer flag for a replicaset and an instance. + -- + storage_1_a.rebalancer = true + replicaset_1.rebalancer = true + t.assert_error_msg_content_equals( + 'Found 2 rebalancer flags at storage_1_uuid and storage_1_a_uuid', + vcfg.check, config) + storage_1_a.rebalancer = nil + replicaset_1.rebalancer = nil + -- + -- Rebalancer flag for 2 replicasets. + -- + replicaset_1.rebalancer = true + replicaset_2.rebalancer = true + t.assert_error_msg_content_equals( + 'Found 2 rebalancer flags at storage_1_uuid and storage_2_uuid', + vcfg.check, config) + replicaset_1.rebalancer = nil + replicaset_2.rebalancer = nil + -- + -- Rebalancer flag for 2 instances. + -- + storage_1_a.rebalancer = true + storage_2_a.rebalancer = true + t.assert_error_msg_content_equals( + 'Found 2 rebalancer flags at storage_1_a_uuid and storage_2_a_uuid', + vcfg.check, config) + storage_1_a.rebalancer = nil + storage_2_a.rebalancer = nil + -- + -- Conflicting rebalancer flag in one replicaset. + -- + replicaset_1.rebalancer = false + storage_1_a.rebalancer = true + t.assert_error_msg_content_equals( + 'Replicaset storage_1_uuid can\'t run the rebalancer, and yet it was '.. + 'explicitly assigned to its instance storage_1_a_uuid', + vcfg.check, config) + replicaset_1.rebalancer = nil + storage_1_a.rebalancer = nil +end diff --git a/test/unit/config.result b/test/unit/config.result index ff19289f..66fd520b 100644 --- a/test/unit/config.result +++ b/test/unit/config.result @@ -460,12 +460,12 @@ replica.uri = '127.0.0.1' lcfg.check(cfg)['sharding'] --- - replicaset_uuid: + weight: 100000 replicas: replica_uuid: - master: true uri: 127.0.0.1 name: storage - weight: 100000 + master: true ... replica.uri = 'user:password@localhost' --- @@ -473,12 +473,12 @@ replica.uri = 'user:password@localhost' lcfg.check(cfg)['sharding'] --- - replicaset_uuid: + weight: 100000 replicas: replica_uuid: - master: true uri: user:password@localhost name: storage - weight: 100000 + master: true ... replica.url = old_uri --- diff --git a/vshard/cfg.lua b/vshard/cfg.lua index 6e93874d..74dd9e41 100644 --- a/vshard/cfg.lua +++ b/vshard/cfg.lua @@ -149,6 +149,7 @@ local replica_template = { type = 'boolean', name = "Master", is_optional = true, check = check_replica_master }, + rebalancer = {type = 'boolean', name = 'Rebalancer flag', is_optional = true}, } local function check_replicas(replicas) @@ -169,6 +170,7 @@ local replicaset_template = { type = 'string', name = 'Master search mode', is_optional = true, check = check_replicaset_master }, + rebalancer = {type = 'boolean', name = 'Rebalancer flag', is_optional = true}, } -- @@ -211,6 +213,7 @@ local function check_sharding(sharding) local uris = {} local names = {} local is_all_weights_zero = true + local rebalancer_uuid for replicaset_uuid, replicaset in pairs(sharding) do if uuids[replicaset_uuid] then error(string.format('Duplicate uuid %s', replicaset_uuid)) @@ -223,7 +226,15 @@ local function check_sharding(sharding) if w == math.huge or w == -math.huge then error('Replicaset weight can not be Inf') end + if replicaset.rebalancer then + if rebalancer_uuid then + error(('Found 2 rebalancer flags at %s and %s'):format( + rebalancer_uuid, replicaset_uuid)) + end + rebalancer_uuid = replicaset_uuid + end validate_config(replicaset, replicaset_template) + local no_rebalancer = replicaset.rebalancer == false local is_master_auto = replicaset.master == 'auto' for replica_uuid, replica in pairs(replicaset.replicas) do if uris[replica.uri] then @@ -240,6 +251,18 @@ local function check_sharding(sharding) 'master flag in replica uuid %s', replica_uuid)) end + if replica.rebalancer then + if rebalancer_uuid then + error(('Found 2 rebalancer flags at %s and %s'):format( + rebalancer_uuid, replica_uuid)) + end + if no_rebalancer then + error(('Replicaset %s can\'t run the rebalancer, and yet '.. + 'it was explicitly assigned to its instance '.. + '%s'):format(replicaset_uuid, replica_uuid)) + end + rebalancer_uuid = replica_uuid + end -- Log warning in case replica.name duplicate is -- found. Message appears once for each unique -- duplicate. diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua index d01a0064..285dc2ae 100644 --- a/vshard/storage/init.lua +++ b/vshard/storage/init.lua @@ -3244,27 +3244,50 @@ end -- local function rebalancer_cfg_find_instance(cfg) local target_uuid + local is_assigned for _, rs in pairs(cfg.sharding) do + if rs.rebalancer == false then + goto next_rs + end for replica_uuid, replica in pairs(rs.replicas) do + local is_rebalancer = rs.rebalancer or replica.rebalancer + local no_rebalancer = replica.rebalancer == false + if is_rebalancer and not is_assigned then + is_assigned = true + target_uuid = nil + end local ok = true - ok = ok and replica.master + ok = ok and not no_rebalancer + ok = ok and (replica.master or replica.rebalancer) ok = ok and (not target_uuid or replica_uuid < target_uuid) + ok = ok and (not is_assigned or is_rebalancer) if ok then target_uuid = replica_uuid end end + ::next_rs:: end return target_uuid end local function rebalancer_cfg_find_replicaset(cfg) local target_uuid + local is_assigned for rs_uuid, rs in pairs(cfg.sharding) do + local is_rebalancer = rs.rebalancer + local no_rebalancer = rs.rebalancer == false + if is_rebalancer and not is_assigned then + is_assigned = true + target_uuid = nil + end local ok = true + ok = ok and not no_rebalancer ok = ok and (rs.master == 'auto') ok = ok and (not target_uuid or rs_uuid < target_uuid) + ok = ok and (not is_assigned or is_rebalancer) if ok then target_uuid = rs_uuid + is_assigned = is_rebalancer end end return target_uuid @@ -3275,6 +3298,15 @@ local function rebalancer_is_needed() local this_replica_uuid = M.this_replica.uuid local this_replicaset_uuid = M.this_replicaset.uuid + local this_replicaset_cfg = cfg.sharding[this_replicaset_uuid] + if this_replicaset_cfg.rebalancer == false then + return false + end + local this_replica_cfg = this_replicaset_cfg.replicas[this_replica_uuid] + if this_replica_cfg.rebalancer == false then + return false + end + local uuid = rebalancer_cfg_find_instance(cfg) if uuid then return this_replica_uuid == uuid