Skip to content

Commit

Permalink
rebalancer: introduce rebalancer flag
Browse files Browse the repository at this point in the history
The flags allows to explicitly assign the rebalancer role to a
specific instance or a replicaset. Or forbid its automatic
assignment to instances or whole replicasets.

Part of #432

NO_DOC=in the final commit with rebalancer options
  • Loading branch information
Gerold103 committed Nov 3, 2023
1 parent 8eae32a commit c09001c
Show file tree
Hide file tree
Showing 5 changed files with 219 additions and 5 deletions.
76 changes: 76 additions & 0 deletions test/storage-luatest/rebalancer_test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -157,3 +157,79 @@ test_group.test_rebalancer_location = function(g)
vtest.cluster_cfg(g, global_cfg)
wait_rebalancer_on_instance(g, 'replica_1_a')
end

test_group.test_locate_with_flag = function(g)
t.assert_equals(vtest.cluster_rebalancer_find(g), 'replica_1_a')
--
-- Assign to another replicaset, with a non-minimal UUID.
--
local new_cfg_template = table.deepcopy(cfg_template)
new_cfg_template.sharding[1].rebalancer = false
new_cfg_template.sharding[2].rebalancer = true
local new_global_cfg = vtest.config_new(new_cfg_template)
vtest.cluster_cfg(g, new_global_cfg)
wait_rebalancer_on_instance(g, 'replica_2_a')
--
-- Automatically move the rebalancer together with the master role.
--
new_cfg_template.sharding[2].replicas.replica_2_a.read_only = true
new_cfg_template.sharding[2].replicas.replica_2_b.read_only = false
new_global_cfg = vtest.config_new(new_cfg_template)
vtest.cluster_cfg(g, new_global_cfg)
wait_rebalancer_on_instance(g, 'replica_2_b')
--
-- Assign to the replicaset with the maximal UUID.
--
new_cfg_template.sharding[2].rebalancer = false
new_cfg_template.sharding[3].rebalancer = true
new_global_cfg = vtest.config_new(new_cfg_template)
vtest.cluster_cfg(g, new_global_cfg)
wait_rebalancer_on_instance(g, 'replica_3_a')
--
-- Assign explicitly to a read-only replica.
--
new_cfg_template.sharding[3].master = nil
new_cfg_template.sharding[3].replicas.replica_3_b.master = true
new_global_cfg = vtest.config_new(new_cfg_template)
vtest.cluster_cfg(g, new_global_cfg)
wait_rebalancer_on_instance(g, 'replica_3_b')
--
-- Forbid the rebalancer on the min-UUID replicaset. Then the replicaset
-- with the next UUID is used.
--
new_cfg_template = table.deepcopy(cfg_template)
new_cfg_template.sharding[1].rebalancer = false
new_global_cfg = vtest.config_new(new_cfg_template)
vtest.cluster_cfg(g, new_global_cfg)
wait_rebalancer_on_instance(g, 'replica_2_a')
--
-- Forbid to run on the current master. But on a replica it won't run
-- without an explicit flag. Hence no rebalancer at all.
--
new_cfg_template.sharding[2].replicas.replica_2_a.rebalancer = false
new_global_cfg = vtest.config_new(new_cfg_template)
vtest.cluster_cfg(g, new_global_cfg)
wait_rebalancer_on_instance(g)
--
-- The master appears on another instance. The rebalancer can finally start.
--
new_cfg_template.sharding[2].replicas.replica_2_b.read_only = false
new_global_cfg = vtest.config_new(new_cfg_template)
vtest.cluster_cfg(g, new_global_cfg)
wait_rebalancer_on_instance(g, 'replica_2_b')
--
-- Explicitly assign to a replica in a non-min-UUID replicaset. Without
-- setting this flag for any replicaset.
--
new_cfg_template = table.deepcopy(cfg_template)
new_cfg_template.sharding[1].rebalancer = nil
new_cfg_template.sharding[2].replicas.replica_2_b.rebalancer = true
new_global_cfg = vtest.config_new(new_cfg_template)
vtest.cluster_cfg(g, new_global_cfg)
wait_rebalancer_on_instance(g, 'replica_2_b')
--
-- Cleanup.
--
vtest.cluster_cfg(g, global_cfg)
wait_rebalancer_on_instance(g, 'replica_1_a')
end
83 changes: 83 additions & 0 deletions test/unit-luatest/config_test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -128,3 +128,86 @@ g.test_extract_vshard = function()
replication_timeout = 10,
})
end

g.test_rebalancer_flag = function()
local storage_1_a = {
uri = 'storage:storage@127.0.0.1:3301',
name = 'storage_1_a',
}
local replicaset_1 = {
replicas = {
storage_1_a_uuid = storage_1_a,
},
}
local storage_2_a = {
uri = 'storage:storage@127.0.0.1:3302',
name = 'storage_2_a',
}
local replicaset_2 = {
replicas = {
storage_2_a_uuid = storage_2_a,
},
}
local config = {
sharding = {
storage_1_uuid = replicaset_1,
storage_2_uuid = replicaset_2,
},
}
t.assert(vcfg.check(config))
--
-- Bad replica-rebalancer flag.
--
storage_1_a.rebalancer = 'test'
t.assert_error_msg_content_equals(
'Rebalancer flag must be boolean', vcfg.check, config)
storage_1_a.rebalancer = nil
--
-- Bad replicaset-rebalancer flag.
--
replicaset_1.rebalancer = 'test'
t.assert_error_msg_content_equals(
'Rebalancer flag must be boolean', vcfg.check, config)
replicaset_1.rebalancer = nil
--
-- Rebalancer flag for a replicaset and an instance.
--
storage_1_a.rebalancer = true
replicaset_1.rebalancer = true
t.assert_error_msg_content_equals(
'Found 2 rebalancer flags at storage_1_uuid and storage_1_a_uuid',
vcfg.check, config)
storage_1_a.rebalancer = nil
replicaset_1.rebalancer = nil
--
-- Rebalancer flag for 2 replicasets.
--
replicaset_1.rebalancer = true
replicaset_2.rebalancer = true
t.assert_error_msg_content_equals(
'Found 2 rebalancer flags at storage_1_uuid and storage_2_uuid',
vcfg.check, config)
replicaset_1.rebalancer = nil
replicaset_2.rebalancer = nil
--
-- Rebalancer flag for 2 instances.
--
storage_1_a.rebalancer = true
storage_2_a.rebalancer = true
t.assert_error_msg_content_equals(
'Found 2 rebalancer flags at storage_1_a_uuid and storage_2_a_uuid',
vcfg.check, config)
storage_1_a.rebalancer = nil
storage_2_a.rebalancer = nil
--
-- Conflicting rebalancer flag in one replicaset.
--
replicaset_1.rebalancer = false
storage_1_a.rebalancer = true
t.assert_error_msg_content_equals(
'Replicaset storage_1_uuid can\'t run the rebalancer, and yet it was '..
'explicitly assigned to its instance storage_1_a_uuid',
vcfg.check, config)
replicaset_1.rebalancer = nil
storage_1_a.rebalancer = nil
end
8 changes: 4 additions & 4 deletions test/unit/config.result
Original file line number Diff line number Diff line change
Expand Up @@ -460,25 +460,25 @@ replica.uri = '127.0.0.1'
lcfg.check(cfg)['sharding']
---
- replicaset_uuid:
weight: 100000
replicas:
replica_uuid:
master: true
uri: 127.0.0.1
name: storage
weight: 100000
master: true
...
replica.uri = 'user:password@localhost'
---
...
lcfg.check(cfg)['sharding']
---
- replicaset_uuid:
weight: 100000
replicas:
replica_uuid:
master: true
uri: user:password@localhost
name: storage
weight: 100000
master: true
...
replica.url = old_uri
---
Expand Down
23 changes: 23 additions & 0 deletions vshard/cfg.lua
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ local replica_template = {
type = 'boolean', name = "Master", is_optional = true,
check = check_replica_master
},
rebalancer = {type = 'boolean', name = 'Rebalancer flag', is_optional = true},
}

local function check_replicas(replicas)
Expand All @@ -169,6 +170,7 @@ local replicaset_template = {
type = 'string', name = 'Master search mode', is_optional = true,
check = check_replicaset_master
},
rebalancer = {type = 'boolean', name = 'Rebalancer flag', is_optional = true},
}

--
Expand Down Expand Up @@ -211,6 +213,7 @@ local function check_sharding(sharding)
local uris = {}
local names = {}
local is_all_weights_zero = true
local rebalancer_uuid
for replicaset_uuid, replicaset in pairs(sharding) do
if uuids[replicaset_uuid] then
error(string.format('Duplicate uuid %s', replicaset_uuid))
Expand All @@ -223,7 +226,15 @@ local function check_sharding(sharding)
if w == math.huge or w == -math.huge then
error('Replicaset weight can not be Inf')
end
if replicaset.rebalancer then
if rebalancer_uuid then
error(('Found 2 rebalancer flags at %s and %s'):format(
rebalancer_uuid, replicaset_uuid))
end
rebalancer_uuid = replicaset_uuid
end
validate_config(replicaset, replicaset_template)
local no_rebalancer = replicaset.rebalancer == false
local is_master_auto = replicaset.master == 'auto'
for replica_uuid, replica in pairs(replicaset.replicas) do
if uris[replica.uri] then
Expand All @@ -240,6 +251,18 @@ local function check_sharding(sharding)
'master flag in replica uuid %s',
replica_uuid))
end
if replica.rebalancer then
if rebalancer_uuid then
error(('Found 2 rebalancer flags at %s and %s'):format(
rebalancer_uuid, replica_uuid))
end
if no_rebalancer then
error(('Replicaset %s can\'t run the rebalancer, and yet '..
'it was explicitly assigned to its instance '..
'%s'):format(replicaset_uuid, replica_uuid))
end
rebalancer_uuid = replica_uuid
end
-- Log warning in case replica.name duplicate is
-- found. Message appears once for each unique
-- duplicate.
Expand Down
34 changes: 33 additions & 1 deletion vshard/storage/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3243,27 +3243,50 @@ end
--
local function rebalancer_cfg_find_instance(cfg)
local target_uuid
local is_assigned
for _, rs in pairs(cfg.sharding) do
if rs.rebalancer == false then
goto next_rs
end
for replica_uuid, replica in pairs(rs.replicas) do
local is_rebalancer = rs.rebalancer or replica.rebalancer
local no_rebalancer = replica.rebalancer == false
if is_rebalancer and not is_assigned then
is_assigned = true
target_uuid = nil
end
local ok = true
ok = ok and replica.master
ok = ok and not no_rebalancer
ok = ok and (replica.master or replica.rebalancer)
ok = ok and (not target_uuid or replica_uuid < target_uuid)
ok = ok and (not is_assigned or is_rebalancer)
if ok then
target_uuid = replica_uuid
end
end
::next_rs::
end
return target_uuid
end

local function rebalancer_cfg_find_replicaset(cfg)
local target_uuid
local is_assigned
for rs_uuid, rs in pairs(cfg.sharding) do
local is_rebalancer = rs.rebalancer
local no_rebalancer = rs.rebalancer == false
if is_rebalancer and not is_assigned then
is_assigned = true
target_uuid = nil
end
local ok = true
ok = ok and not no_rebalancer
ok = ok and (rs.master == 'auto')
ok = ok and (not target_uuid or rs_uuid < target_uuid)
ok = ok and (not is_assigned or is_rebalancer)
if ok then
target_uuid = rs_uuid
is_assigned = is_rebalancer
end
end
return target_uuid
Expand All @@ -3274,6 +3297,15 @@ local function rebalancer_is_needed()
local this_replica_uuid = M.this_replica.uuid
local this_replicaset_uuid = M.this_replicaset.uuid

local this_replicaset_cfg = cfg.sharding[this_replicaset_uuid]
if this_replicaset_cfg.rebalancer == false then
return false
end
local this_replica_cfg = this_replicaset_cfg.replicas[this_replica_uuid]
if this_replica_cfg.rebalancer == false then
return false
end

local uuid = rebalancer_cfg_find_instance(cfg)
if uuid then
return this_replica_uuid == uuid
Expand Down

0 comments on commit c09001c

Please sign in to comment.