Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: allow to choose rebalancer location #431

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
182 changes: 182 additions & 0 deletions test/storage-luatest/storage_2_2_test.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
local t = require('luatest')
local vtest = require('test.luatest_helpers.vtest')
local vutil = require('vshard.util')

local group_config = {{engine = 'memtx'}, {engine = 'vinyl'}}

if vutil.feature.memtx_mvcc then
table.insert(group_config, {
engine = 'memtx', memtx_use_mvcc_engine = true
})
table.insert(group_config, {
engine = 'vinyl', memtx_use_mvcc_engine = true
})
end

local test_group = t.group('storage', group_config)

local cfg_template = {
sharding = {
{
replicas = {
replica_1_a = {
master = true,
},
replica_1_b = {},
},
},
{
replicas = {
replica_2_a = {
master = true,
},
replica_2_b = {},
},
},
},
bucket_count = 16,
rebalancer_mode = 'off',
}
local global_cfg

test_group.before_all(function(g)
cfg_template.memtx_use_mvcc_engine = g.params.memtx_use_mvcc_engine
global_cfg = vtest.config_new(cfg_template)

vtest.cluster_new(g, global_cfg)
vtest.cluster_bootstrap(g, global_cfg)
end)

test_group.after_all(function(g)
g.cluster:drop()
end)

local function find_rebalancer_name(g)
local map, err = vtest.cluster_exec_each(g, function()
return ivshard.storage.internal.rebalancer_fiber ~= nil
end)
t.assert_equals(err, nil)
local rebalancer_name
for name, has_rebalancer in pairs(map) do
if has_rebalancer then
t.assert_equals(rebalancer_name, nil)
rebalancer_name = name
end
end
return rebalancer_name
end

test_group.test_rebalancer_mode_off_no_marks = function(g)
local _, err = vtest.cluster_exec_each(g, function()
ilt.assert(not ivshard.storage.internal.rebalancer_fiber)
end)
t.assert_equals(err, nil)
end

test_group.test_rebalancer_mode_off_ignore_marks = function(g)
local new_cfg_template = table.deepcopy(cfg_template)
new_cfg_template.sharding[1].replicas.replica_1_a.rebalancer = true
local new_cfg = vtest.config_new(new_cfg_template)
vtest.cluster_cfg(g, new_cfg)
local _, err = vtest.cluster_exec_each(g, function()
ilt.assert(not ivshard.storage.internal.rebalancer_fiber)
end)
t.assert_equals(err, nil)
-- Revert.
vtest.cluster_cfg(g, global_cfg)
end

test_group.test_rebalancer_mode_auto_ignore_marks = function(g)
local new_cfg_template = table.deepcopy(cfg_template)
new_cfg_template.rebalancer_mode = 'auto'
local new_cfg = vtest.config_new(new_cfg_template)
local map, err = vtest.cluster_exec_each(g, function(cfg)
-- Tell each instance it should be the rebalancer. But it must be
-- ignored when mode = 'auto'.
local info = box.info
cfg.sharding[ivutil.replicaset_uuid(info)]
.replicas[info.uuid].rebalancer = true
local _, err = ivshard.storage.cfg(cfg, box.info.uuid)
ilt.assert_equals(err, nil)
return {
has_rebalancer = ivshard.storage.internal.rebalancer_fiber ~= nil,
uuid = box.info.uuid,
}
end, {new_cfg})
t.assert_equals(err, nil)
local min_uuid
local rebalancer_uuid
for _, res in pairs(map) do
if not min_uuid or min_uuid > res.uuid then
min_uuid = res.uuid
end
if res.has_rebalancer then
t.assert_equals(rebalancer_uuid, nil)
rebalancer_uuid = res.uuid
end
end
t.assert_equals(min_uuid, rebalancer_uuid)
-- Revert.
vtest.cluster_cfg(g, global_cfg)
end

test_group.test_rebalancer_mode_manual_no_marks = function(g)
local new_cfg_template = table.deepcopy(cfg_template)
new_cfg_template.rebalancer_mode = 'manual'
local new_cfg = vtest.config_new(new_cfg_template)
vtest.cluster_cfg(g, new_cfg)
local _, err = vtest.cluster_exec_each(g, function()
ilt.assert(not ivshard.storage.internal.rebalancer_fiber)
end)
t.assert_equals(err, nil)
-- Revert.
vtest.cluster_cfg(g, global_cfg)
end

test_group.test_rebalancer_mode_manual_with_mark = function(g)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test is flaky on my machine, latest tarantool master.

 | Tarantool 3.0.0-alpha1-110-g3774e85d2
 | Target: Linux-x86_64-Debug
 | Build options: cmake . -DCMAKE_INSTALL_PREFIX=/usr -DENABLE_BACKTRACE=TRUE
 | Compiler: GNU-13.1.1
 | C_FLAGS: -fexceptions -funwind-tables -fasynchronous-unwind-tables -fno-common -msse2 -Wformat -Wformat-security -Werror=format-security -fstack-protector-strong -fPIC -fmacro-prefix-map=/home/serpentian/Programming/tnt/tarantool=. -std=c11 -Wall -Wextra -Wno-gnu-alignof-expression -fno-gnu89-inline -Wno-cast-function-type -Werror
 | CXX_FLAGS: -fexceptions -funwind-tables -fasynchronous-unwind-tables -fno-common -msse2 -Wformat -Wformat-security -Werror=format-security -fstack-protector-strong -fPIC -fmacro-prefix-map=/home/serpentian/Programming/tnt/tarantool=. -std=c++11 -Wall -Wextra -Wno-invalid-offsetof -Wno-gnu-alignof-expression -Wno-cast-function-type -Werror
 
----------------------------------------------------
[004] storage-luatest/storage_2_2_test.lua
[004] TAP version 13
[004] 1..20
[004] # Started on Fri Aug 18 15:49:06 2023
[004] # Starting group: storage.engine:"memtx"
[004] ok     1	storage.engine:"memtx".test_rebalancer_mode_off_no_marks
[004] ok     2	storage.engine:"memtx".test_rebalancer_mode_off_ignore_marks
[004] ok     3	storage.engine:"memtx".test_rebalancer_mode_auto_ignore_marks
[004] ok     4	storage.engine:"memtx".test_rebalancer_mode_manual_no_marks
[004] not ok 5	storage.engine:"memtx".test_rebalancer_mode_manual_with_mark
[004] #   ...ing/tnt/vshard/test/storage-luatest/storage_2_2_test.lua:163: expected: nil, actual: {"bucket_id":10,"reason":"bucket state is changed: was receiving, became active","code":1,"destination":"00000000-0000-0000-0000-000000000001","type":"ShardingError","name":"WRONG_BUCKET","message":"Cannot perform action with bucket 10, reason: bucket state is changed: was receiving, became active"}
[004] #   stack traceback:
[004] #   	...ing/tnt/vshard/test/storage-luatest/storage_2_2_test.lua:155: in function 'storage.engine:"memtx".test_rebalancer_mode_manual_with_mark'
[004] #   	...
[004] #   	[C]: in function 'xpcall'

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, that is interesting! I couldn't reproduce it after a few hundreds runs. But I might see the reason, not related to the test or the patch. Does it reproduce on your machine reliably? If yes, then can you try with this diff?

diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
index 5f34410..7221b7c 100644
--- a/vshard/storage/init.lua
+++ b/vshard/storage/init.lua
@@ -1790,6 +1790,9 @@ local function bucket_test_gc(bids)
     local _bucket = box.space._bucket
     local ref, bucket, status
     for i, bid in ipairs(bids) do
+        if M.rebalancer_transfering_buckets[bid] then
+            goto not_ok_bid
+        end
         bucket = _bucket:get(bid)
         status = bucket ~= nil and bucket.status or consts.BUCKET.GARBAGE
         if status ~= consts.BUCKET.GARBAGE and status ~= consts.BUCKET.SENT then
@@ -2396,6 +2399,9 @@ local function gc_bucket_process_sent_xc()
     for _, b in _bucket.index.status:pairs(consts.BUCKET.SENT) do
         i = i + 1
         local bid = b.id
+        if M.rebalancer_transfering_buckets[bid] then
+            goto continue
+        end
         local ref = M.bucket_refs[bid]
         if ref ~= nil then
             assert(ref.rw == 0)

Copy link
Contributor

@Serpentian Serpentian Aug 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see this error anymore. ref should be above goto. Applied the following:

Diff
diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
index 5f34410..8e4c6b2 100644
--- a/vshard/storage/init.lua
+++ b/vshard/storage/init.lua
@@ -1790,6 +1790,9 @@ local function bucket_test_gc(bids)
     local _bucket = box.space._bucket
     local ref, bucket, status
     for i, bid in ipairs(bids) do
+        if M.rebalancer_transfering_buckets[bid] then
+            goto not_ok_bid
+        end
         bucket = _bucket:get(bid)
         status = bucket ~= nil and bucket.status or consts.BUCKET.GARBAGE
         if status ~= consts.BUCKET.GARBAGE and status ~= consts.BUCKET.SENT then
@@ -2393,10 +2396,14 @@ local function gc_bucket_process_sent_xc()
     local batch = table.new(limit, 0)
     local i = 0
     local is_done = true
+    local ref
     for _, b in _bucket.index.status:pairs(consts.BUCKET.SENT) do
         i = i + 1
         local bid = b.id
-        local ref = M.bucket_refs[bid]
+        if M.rebalancer_transfering_buckets[bid] then
+            goto continue
+        end
+        ref = M.bucket_refs[bid]
         if ref ~= nil then
             assert(ref.rw == 0)
             if ref.ro ~= 0 then

But other error pops up pretty often. Seems, bucket sending/receiving hangs:

[004] not ok 15	storage.engine:"vinyl".test_rebalancer_mode_manual_with_mark
[004] #   ...ing/tnt/vshard/test/storage-luatest/storage_2_2_test.lua:178: expected: nil, actual: {
[004] #       replica_2_a = ...entian/Programming/tnt/vshard/test/instances/storage.lua:86: Still have SENT buckets,
[004] #   }
[004] #   stack traceback:
[004] #   	...ing/tnt/vshard/test/storage-luatest/storage_2_2_test.lua:178: in function 'storage.engine:"vinyl".test_rebalancer_mode_manual_with_mark'
[004] #   	...
[004] #   	[C]: in function 'xpcall'

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've got it to fail only 2 times in an hour after thousands of runs. Can't gather enough info. Could you please gather some logs with this diff below?

Diff
diff --git a/test/luatest_helpers/vtest.lua b/test/luatest_helpers/vtest.lua
index e767a26..6d4ba77 100644
--- a/test/luatest_helpers/vtest.lua
+++ b/test/luatest_helpers/vtest.lua
@@ -8,7 +8,7 @@ local yaml = require('yaml')
 local vrepset = require('vshard.replicaset')
 local log = require('log')
 
-local wait_timeout = 50
+local wait_timeout = 10
 -- Use it in busy-loops like `while !cond do fiber.sleep(busy_step) end`.
 local busy_step = 0.005
 local uuid_idx = 1
diff --git a/test/storage-luatest/storage_2_2_test.lua b/test/storage-luatest/storage_2_2_test.lua
index c33955f..6384c13 100644
--- a/test/storage-luatest/storage_2_2_test.lua
+++ b/test/storage-luatest/storage_2_2_test.lua
@@ -13,6 +13,8 @@ if vutil.feature.memtx_mvcc then
     })
 end
 
+group_config = {{engine = 'memtx'}}
+
 local test_group = t.group('storage', group_config)
 
 local cfg_template = {
@@ -48,7 +50,7 @@ test_group.before_all(function(g)
 end)
 
 test_group.after_all(function(g)
-    g.cluster:drop()
+    --g.cluster:drop()
 end)
 
 local function find_rebalancer_name(g)
@@ -66,117 +68,127 @@ local function find_rebalancer_name(g)
     return rebalancer_name
 end
 
-test_group.test_rebalancer_mode_off_no_marks = function(g)
-    local _, err = vtest.cluster_exec_each(g, function()
-        ilt.assert(not ivshard.storage.internal.rebalancer_fiber)
-    end)
-    t.assert_equals(err, nil)
-end
-
-test_group.test_rebalancer_mode_off_ignore_marks = function(g)
-    local new_cfg_template = table.deepcopy(cfg_template)
-    new_cfg_template.sharding[1].replicas.replica_1_a.rebalancer = true
-    local new_cfg = vtest.config_new(new_cfg_template)
-    vtest.cluster_cfg(g, new_cfg)
-    local _, err = vtest.cluster_exec_each(g, function()
-        ilt.assert(not ivshard.storage.internal.rebalancer_fiber)
-    end)
-    t.assert_equals(err, nil)
-    -- Revert.
-    vtest.cluster_cfg(g, global_cfg)
-end
-
-test_group.test_rebalancer_mode_auto_ignore_marks = function(g)
-    local new_cfg_template = table.deepcopy(cfg_template)
-    new_cfg_template.rebalancer_mode = 'auto'
-    local new_cfg = vtest.config_new(new_cfg_template)
-    local map, err = vtest.cluster_exec_each(g, function(cfg)
-        -- Tell each instance it should be the rebalancer. But it must be
-        -- ignored when mode = 'auto'.
-        local info = box.info
-        cfg.sharding[ivutil.replicaset_uuid(info)]
-            .replicas[info.uuid].rebalancer = true
-        local _, err = ivshard.storage.cfg(cfg, box.info.uuid)
-        ilt.assert_equals(err, nil)
-        return {
-            has_rebalancer = ivshard.storage.internal.rebalancer_fiber ~= nil,
-            uuid = box.info.uuid,
-        }
-    end, {new_cfg})
-    t.assert_equals(err, nil)
-    local min_uuid
-    local rebalancer_uuid
-    for _, res in pairs(map) do
-        if not min_uuid or min_uuid > res.uuid then
-            min_uuid = res.uuid
-        end
-        if res.has_rebalancer then
-            t.assert_equals(rebalancer_uuid, nil)
-            rebalancer_uuid = res.uuid
-        end
-    end
-    t.assert_equals(min_uuid, rebalancer_uuid)
-    -- Revert.
-    vtest.cluster_cfg(g, global_cfg)
-end
-
-test_group.test_rebalancer_mode_manual_no_marks = function(g)
-    local new_cfg_template = table.deepcopy(cfg_template)
-    new_cfg_template.rebalancer_mode = 'manual'
-    local new_cfg = vtest.config_new(new_cfg_template)
-    vtest.cluster_cfg(g, new_cfg)
-    local _, err = vtest.cluster_exec_each(g, function()
-        ilt.assert(not ivshard.storage.internal.rebalancer_fiber)
-    end)
-    t.assert_equals(err, nil)
-    -- Revert.
-    vtest.cluster_cfg(g, global_cfg)
-end
+-- test_group.test_rebalancer_mode_off_no_marks = function(g)
+--     local _, err = vtest.cluster_exec_each(g, function()
+--         ilt.assert(not ivshard.storage.internal.rebalancer_fiber)
+--     end)
+--     t.assert_equals(err, nil)
+-- end
+
+-- test_group.test_rebalancer_mode_off_ignore_marks = function(g)
+--     local new_cfg_template = table.deepcopy(cfg_template)
+--     new_cfg_template.sharding[1].replicas.replica_1_a.rebalancer = true
+--     local new_cfg = vtest.config_new(new_cfg_template)
+--     vtest.cluster_cfg(g, new_cfg)
+--     local _, err = vtest.cluster_exec_each(g, function()
+--         ilt.assert(not ivshard.storage.internal.rebalancer_fiber)
+--     end)
+--     t.assert_equals(err, nil)
+--     -- Revert.
+--     vtest.cluster_cfg(g, global_cfg)
+-- end
+
+-- test_group.test_rebalancer_mode_auto_ignore_marks = function(g)
+--     local new_cfg_template = table.deepcopy(cfg_template)
+--     new_cfg_template.rebalancer_mode = 'auto'
+--     local new_cfg = vtest.config_new(new_cfg_template)
+--     local map, err = vtest.cluster_exec_each(g, function(cfg)
+--         -- Tell each instance it should be the rebalancer. But it must be
+--         -- ignored when mode = 'auto'.
+--         local info = box.info
+--         cfg.sharding[ivutil.replicaset_uuid(info)]
+--             .replicas[info.uuid].rebalancer = true
+--         local _, err = ivshard.storage.cfg(cfg, box.info.uuid)
+--         ilt.assert_equals(err, nil)
+--         return {
+--             has_rebalancer = ivshard.storage.internal.rebalancer_fiber ~= nil,
+--             uuid = box.info.uuid,
+--         }
+--     end, {new_cfg})
+--     t.assert_equals(err, nil)
+--     local min_uuid
+--     local rebalancer_uuid
+--     for _, res in pairs(map) do
+--         if not min_uuid or min_uuid > res.uuid then
+--             min_uuid = res.uuid
+--         end
+--         if res.has_rebalancer then
+--             t.assert_equals(rebalancer_uuid, nil)
+--             rebalancer_uuid = res.uuid
+--         end
+--     end
+--     t.assert_equals(min_uuid, rebalancer_uuid)
+--     -- Revert.
+--     vtest.cluster_cfg(g, global_cfg)
+-- end
+
+-- test_group.test_rebalancer_mode_manual_no_marks = function(g)
+--     local new_cfg_template = table.deepcopy(cfg_template)
+--     new_cfg_template.rebalancer_mode = 'manual'
+--     local new_cfg = vtest.config_new(new_cfg_template)
+--     vtest.cluster_cfg(g, new_cfg)
+--     local _, err = vtest.cluster_exec_each(g, function()
+--         ilt.assert(not ivshard.storage.internal.rebalancer_fiber)
+--     end)
+--     t.assert_equals(err, nil)
+--     -- Revert.
+--     vtest.cluster_cfg(g, global_cfg)
+-- end
 
 test_group.test_rebalancer_mode_manual_with_mark = function(g)
-    local new_cfg_template = table.deepcopy(cfg_template)
-    new_cfg_template.rebalancer_mode = 'manual'
-
-    -- Try a master.
-    new_cfg_template.sharding[1].replicas.replica_1_a.rebalancer = true
-    local new_cfg = vtest.config_new(new_cfg_template)
-    vtest.cluster_cfg(g, new_cfg)
-    t.assert_equals(find_rebalancer_name(g), 'replica_1_a')
-
-    -- Try a replica.
-    new_cfg_template.sharding[1].replicas.replica_1_a.rebalancer = nil
-    new_cfg_template.sharding[2].replicas.replica_2_b.rebalancer = true
-    new_cfg = vtest.config_new(new_cfg_template)
-    vtest.cluster_cfg(g, new_cfg)
-    t.assert_equals(find_rebalancer_name(g), 'replica_2_b')
-
-    -- Ensure it works even on the replica.
-    vtest.cluster_rebalancer_disable(g)
-    g.replica_1_a:exec(function(rs2_uuid)
-        while true do
-            local bid = _G.get_first_bucket()
-            if not bid then
-                break
+    for iter = 1, 10 do
+        local new_cfg_template = table.deepcopy(cfg_template)
+        new_cfg_template.rebalancer_mode = 'manual'
+
+        -- Try a master.
+        new_cfg_template.sharding[1].replicas.replica_1_a.rebalancer = true
+        local new_cfg = vtest.config_new(new_cfg_template)
+        vtest.cluster_cfg(g, new_cfg)
+        t.assert_equals(find_rebalancer_name(g), 'replica_1_a')
+
+        -- Try a replica.
+        new_cfg_template.sharding[1].replicas.replica_1_a.rebalancer = nil
+        new_cfg_template.sharding[2].replicas.replica_2_b.rebalancer = true
+        new_cfg = vtest.config_new(new_cfg_template)
+        vtest.cluster_cfg(g, new_cfg)
+        t.assert_equals(find_rebalancer_name(g), 'replica_2_b')
+
+        -- Ensure it works even on the replica.
+        vtest.cluster_rebalancer_disable(g)
+        g.replica_1_a:exec(function(rs2_uuid)
+            while true do
+                local bid = _G.get_first_bucket()
+                if not bid then
+                    break
+                end
+                local _, err = ivshard.storage.bucket_send(
+                    bid, rs2_uuid, {timeout = iwait_timeout})
+                ilt.assert_equals(err, nil)
             end
-            local _, err = ivshard.storage.bucket_send(
-                bid, rs2_uuid, {timeout = iwait_timeout})
-            ilt.assert_equals(err, nil)
+        end, {g.replica_2_a:replicaset_uuid()})
+        vtest.cluster_rebalancer_enable(g)
+
+        g.replica_2_b:exec(function()
+            ivtest.service_wait_for_new_ok(
+                ivshard.storage.internal.rebalancer_service, {
+                    on_yield = ivshard.storage.rebalancer_wakeup
+                })
+        end)
+        local _, err = vtest.cluster_exec_each_master(g, function(bucket_count)
+            _G.bucket_gc_wait()
+            ilt.assert_equals(ivshard.storage.buckets_count(), bucket_count)
+        end, {new_cfg_template.bucket_count / #new_cfg_template.sharding})
+        if err then
+            vtest.cluster_exec_each_master(g, function()
+                local log = require('log')
+                local yaml = require('yaml')
+                log.info(yaml.encode(ivshard.storage.internal.rebalancer_transfering_buckets))
+                log.info(yaml.encode(box.space._bucket:select()))
+            end)
         end
-    end, {g.replica_2_a:replicaset_uuid()})
-    vtest.cluster_rebalancer_enable(g)
-
-    g.replica_2_b:exec(function()
-        ivtest.service_wait_for_new_ok(
-            ivshard.storage.internal.rebalancer_service, {
-                on_yield = ivshard.storage.rebalancer_wakeup
-            })
-    end)
-    local _, err = vtest.cluster_exec_each_master(g, function(bucket_count)
-        _G.bucket_gc_wait()
-        ilt.assert_equals(ivshard.storage.buckets_count(), bucket_count)
-    end, {new_cfg_template.bucket_count / #new_cfg_template.sharding})
-    t.assert_equals(err, nil)
+        t.assert_equals(err, nil)
 
-    -- Revert.
-    vtest.cluster_cfg(g, global_cfg)
+        -- Revert.
+        vtest.cluster_cfg(g, global_cfg)
+    end
 end
diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
index 5f34410..8e4c6b2 100644
--- a/vshard/storage/init.lua
+++ b/vshard/storage/init.lua
@@ -1790,6 +1790,9 @@ local function bucket_test_gc(bids)
     local _bucket = box.space._bucket
     local ref, bucket, status
     for i, bid in ipairs(bids) do
+        if M.rebalancer_transfering_buckets[bid] then
+            goto not_ok_bid
+        end
         bucket = _bucket:get(bid)
         status = bucket ~= nil and bucket.status or consts.BUCKET.GARBAGE
         if status ~= consts.BUCKET.GARBAGE and status ~= consts.BUCKET.SENT then
@@ -2393,10 +2396,14 @@ local function gc_bucket_process_sent_xc()
     local batch = table.new(limit, 0)
     local i = 0
     local is_done = true
+    local ref
     for _, b in _bucket.index.status:pairs(consts.BUCKET.SENT) do
         i = i + 1
         local bid = b.id
-        local ref = M.bucket_refs[bid]
+        if M.rebalancer_transfering_buckets[bid] then
+            goto continue
+        end
+        ref = M.bucket_refs[bid]
         if ref ~= nil then
             assert(ref.rw == 0)
             if ref.ro ~= 0 then

I don't think that bucket sending hangs. It would fail earlier then. The error is at the waiting for GC to complete. Some SENT buckets are not GC-ed. Could be caused by my previous diff which ignores M.rebalancer_transfering_buckets. But I don't see any places where it could be set and then not unset back.

Copy link
Contributor

@Serpentian Serpentian Aug 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here're the logs:
logs.tar.gz

I'll try to investigate the issue tonight or tomorrow.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the logs it looks like M.rebalancer_transfering_buckets is not a problem. It is completely empty when the failure happens. And no errors from bucket GC service reported. I would appreciate if you try to investigate it, thanks. I simply can't reproduce it except a couple of times per hour of running.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I finally figured this out. I applied the following patch. It's always bucket with id = 8, which hangs in SENT status.

Diff
diff --git a/test/storage-luatest/storage_2_2_test.lua b/test/storage-luatest/storage_2_2_test.lua
index c33955f..8a1b685 100644
--- a/test/storage-luatest/storage_2_2_test.lua
+++ b/test/storage-luatest/storage_2_2_test.lua
@@ -2,6 +2,8 @@ local t = require('luatest')
 local vtest = require('test.luatest_helpers.vtest')
 local vutil = require('vshard.util')
 
+local fiber = require('fiber')
+
 local group_config = {{engine = 'memtx'}, {engine = 'vinyl'}}
 
 if vutil.feature.memtx_mvcc then
@@ -48,7 +50,7 @@ test_group.before_all(function(g)
 end)
 
 test_group.after_all(function(g)
-    g.cluster:drop()
+    g.cluster:stop()
 end)
 
 local function find_rebalancer_name(g)
@@ -175,6 +177,15 @@ test_group.test_rebalancer_mode_manual_with_mark = function(g)
         _G.bucket_gc_wait()
         ilt.assert_equals(ivshard.storage.buckets_count(), bucket_count)
     end, {new_cfg_template.bucket_count / #new_cfg_template.sharding})
+    if (err) then
+        vtest.cluster_exec_each(g, function()
+            local log = require('log')
+            local yaml = require('yaml')
+            log.info('ERROR')
+            log.info(yaml.encode(ivshard.storage.internal.rebalancer_transfering_buckets))
+            log.info(yaml.encode(box.space._bucket:select()))
+        end)
+    end
     t.assert_equals(err, nil)
 
     -- Revert.
diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
index 5f34410..5ea00e5 100644
--- a/vshard/storage/init.lua
+++ b/vshard/storage/init.lua
@@ -1790,6 +1790,9 @@ local function bucket_test_gc(bids)
     local _bucket = box.space._bucket
     local ref, bucket, status
     for i, bid in ipairs(bids) do
+        if M.rebalancer_transfering_buckets[bid] then
+            goto not_ok_bid
+        end
         bucket = _bucket:get(bid)
         status = bucket ~= nil and bucket.status or consts.BUCKET.GARBAGE
         if status ~= consts.BUCKET.GARBAGE and status ~= consts.BUCKET.SENT then
@@ -2155,6 +2158,9 @@ local function bucket_send(bucket_id, destination, opts)
     if type(bucket_id) ~= 'number' or type(destination) ~= 'string' then
         error('Usage: bucket_send(bucket_id, destination)')
     end
+    if (bucket_id == 8) then
+        log.info('starting sending bucket 8')
+    end
     M.rebalancer_transfering_buckets[bucket_id] = true
     local exception_guard = {}
     local status, ret, err = pcall(bucket_send_xc, bucket_id, destination, opts,
@@ -2163,6 +2169,9 @@ local function bucket_send(bucket_id, destination, opts)
         exception_guard.ref.rw_lock = false
     end
     M.rebalancer_transfering_buckets[bucket_id] = nil
+    if (bucket_id == 8) then
+        log.info('end sending bucket 8')
+    end
     if status then
         if ret then
             return ret
@@ -2393,10 +2402,17 @@ local function gc_bucket_process_sent_xc()
     local batch = table.new(limit, 0)
     local i = 0
     local is_done = true
+    local ref
     for _, b in _bucket.index.status:pairs(consts.BUCKET.SENT) do
         i = i + 1
         local bid = b.id
-        local ref = M.bucket_refs[bid]
+        if M.rebalancer_transfering_buckets[bid] then
+            if (bid == 8) then
+                log.info('rebalancer_transferring_buckets')
+            end
+            goto continue
+        end
+        ref = M.bucket_refs[bid]
         if ref ~= nil then
             assert(ref.rw == 0)
             if ref.ro ~= 0 then
@@ -2407,6 +2423,7 @@ local function gc_bucket_process_sent_xc()
         if #batch < limit then
             goto continue
         end
+        log.info('Processing batch: ' .. require('yaml').encode(batch))
         is_done = gc_bucket_process_sent_one_batch_xc(batch) and is_done
         batch = table.new(limit, 0)
     ::continue::
@@ -2415,6 +2432,7 @@ local function gc_bucket_process_sent_xc()
         end
     end
     if next(batch) then
+        log.info('Processing batch: ' .. require('yaml').encode(batch))
         is_done = gc_bucket_process_sent_one_batch_xc(batch) and is_done
     end
     return is_done
@@ -2479,6 +2497,7 @@ local function gc_bucket_service_f(service)
             service:set_activity('gc garbage')
             status, err = gc_bucket_drop(consts.BUCKET.GARBAGE, route_map)
             if status then
+                log.info('PROCESSING SENT')
                 service:set_activity('gc sent')
                 status, is_done, err = gc_bucket_process_sent()
             end

replica_2_a is the one, which failed. Here's its logs:

replica_2_a
2023-08-23 22:40:05.589 [456440] main/119/vshard.gc/vshard.storage I> PROCESSING SENT
2023-08-23 22:40:05.590 [456440] main/162/vshard.rebalancer_applier/vshard.storage I> Apply rebalancer routes with 1 workers:
---
00000000-0000-0000-0000-000000000001: 8
...

2023-08-23 22:40:05.590 [456440] main/162/vshard.rebalancer_applier/vshard.storage I> Rebalancer workers have started, wait for their termination
2023-08-23 22:40:05.590 [456440] main/119/vshard.gc/vshard.storage I> PROCESSING SENT
2023-08-23 22:40:05.592 [456440] main/119/vshard.gc/vshard.storage I> PROCESSING SENT
2023-08-23 22:40:05.594 [456440] main/119/vshard.gc/vshard.storage I> PROCESSING SENT
2023-08-23 22:40:05.594 [456440] main/119/vshard.gc/vshard.storage I> Processing batch: ---
- 1
...

2023-08-23 22:40:05.596 [456440] main/164/unix/:/home/serpentian/Programming/tnt/vshard/test/var/010_storage-luatest/replica_2_b.iproto (net.box)/vshard.replicaset I> connected to unix/:/home/serpentian/Programming/tnt/vshard/test/var/010_storage-luatest/replica_2_b.iproto
2023-08-23 22:40:05.598 [456440] main/119/vshard.gc/vshard.storage I> PROCESSING SENT
2023-08-23 22:40:05.598 [456440] main/119/vshard.gc/vshard.storage I> PROCESSING SENT
2023-08-23 22:40:05.599 [456440] main/119/vshard.gc/vshard.storage I> PROCESSING SENT
2023-08-23 22:40:05.602 [456440] main/119/vshard.gc/vshard.storage I> PROCESSING SENT
2023-08-23 22:40:05.602 [456440] main/119/vshard.gc/vshard.storage I> Processing batch: ---
- 2
...

2023-08-23 22:40:05.606 [456440] main/119/vshard.gc/vshard.storage I> PROCESSING SENT
2023-08-23 22:40:05.606 [456440] main/119/vshard.gc/vshard.storage I> PROCESSING SENT
2023-08-23 22:40:05.607 [456440] main/119/vshard.gc/vshard.storage I> PROCESSING SENT
2023-08-23 22:40:05.608 [456440] main/119/vshard.gc/vshard.storage I> PROCESSING SENT
2023-08-23 22:40:05.608 [456440] main/119/vshard.gc/vshard.storage I> Processing batch: ---
- 3
...

2023-08-23 22:40:05.609 [456440] main/119/vshard.gc/vshard.storage I> PROCESSING SENT
2023-08-23 22:40:05.609 [456440] main/119/vshard.gc/vshard.storage I> PROCESSING SENT
2023-08-23 22:40:05.611 [456440] main/119/vshard.gc/vshard.storage I> PROCESSING SENT
2023-08-23 22:40:05.612 [456440] main/119/vshard.gc/vshard.storage I> PROCESSING SENT
2023-08-23 22:40:05.612 [456440] main/119/vshard.gc/vshard.storage I> Processing batch: ---
- 4
...

2023-08-23 22:40:05.613 [456440] main/119/vshard.gc/vshard.storage I> PROCESSING SENT
2023-08-23 22:40:05.613 [456440] main/119/vshard.gc/vshard.storage I> PROCESSING SENT
2023-08-23 22:40:05.614 [456440] main/119/vshard.gc/vshard.storage I> PROCESSING SENT
2023-08-23 22:40:05.615 [456440] main/119/vshard.gc/vshard.storage I> PROCESSING SENT
2023-08-23 22:40:05.615 [456440] main/119/vshard.gc/vshard.storage I> Processing batch: ---
- 5
...

2023-08-23 22:40:05.615 [456440] main/119/vshard.gc/vshard.storage I> PROCESSING SENT
2023-08-23 22:40:05.615 [456440] main/119/vshard.gc/vshard.storage I> PROCESSING SENT
2023-08-23 22:40:05.617 [456440] main/119/vshard.gc/vshard.storage I> PROCESSING SENT
2023-08-23 22:40:05.618 [456440] main/119/vshard.gc/vshard.storage I> PROCESSING SENT
2023-08-23 22:40:05.618 [456440] main/119/vshard.gc/vshard.storage I> Processing batch: ---
- 6
...

2023-08-23 22:40:05.619 [456440] main/119/vshard.gc/vshard.storage I> PROCESSING SENT
2023-08-23 22:40:05.619 [456440] main/119/vshard.gc/vshard.storage I> PROCESSING SENT
2023-08-23 22:40:05.619 [456440] main/119/vshard.gc/vshard.storage I> PROCESSING SENT
2023-08-23 22:40:05.620 [456440] main/163/vshard.rebalancer_worker_1/vshard.storage I> starting sending bucket 8
2023-08-23 22:40:05.620 [456440] main/119/vshard.gc/vshard.storage I> PROCESSING SENT
2023-08-23 22:40:05.620 [456440] main/119/vshard.gc/vshard.storage I> Processing batch: ---
- 7
...

2023-08-23 22:40:05.622 [456440] main/119/vshard.gc/vshard.storage I> PROCESSING SENT
2023-08-23 22:40:05.622 [456440] main/119/vshard.gc/vshard.storage I> PROCESSING SENT
2023-08-23 22:40:05.623 [456440] main/119/vshard.gc/vshard.storage I> PROCESSING SENT
2023-08-23 22:40:05.623 [456440] main/119/vshard.gc/vshard.storage I> rebalancer_transferring_buckets
2023-08-23 22:40:05.623 [456440] main/163/vshard.rebalancer_worker_1/vshard.storage I> end sending bucket 8
2023-08-23 22:40:05.623 [456440] main/163/vshard.rebalancer_worker_1/vshard.storage I> 8 buckets were successfully sent to 00000000-0000-0000-0000-000000000001
2023-08-23 22:40:05.623 [456440] main/162/vshard.rebalancer_applier/vshard.storage I> Rebalancer routes are applied
2023-08-23 22:40:55.667 [456440] main/109/main/test.storage-luatest.storage_2_2_test I> ERROR
2023-08-23 22:40:55.667 [456440] main/109/main/test.storage-luatest.storage_2_2_test I> --- []
...

2023-08-23 22:40:55.667 [456440] main/109/main C> Potentially long select from space '_bucket' (513)
 stack traceback:
	builtin/box/schema.lua:2389: in function 'log_long_select'
	builtin/box/schema.lua:2406: in function 'select'
	...ing/tnt/vshard/test/storage-luatest/storage_2_2_test.lua:191: in function <...ing/tnt/vshard/test/storage-luatest/storage_2_2_test.lua:186>
	[builtin#20]: at 0x56450864c289
	[C]: at 0x564508556817
2023-08-23 22:40:55.667 [456440] main/109/main/test.storage-luatest.storage_2_2_test I> ---
- [8, 'sent', '00000000-0000-0000-0000-000000000001']
- [9, 'active']
- [10, 'active']
- [11, 'active']
- [12, 'active']
- [13, 'active']
- [14, 'active']
- [15, 'active']
- [16, 'active']
...

The bucket 8 is being sent, when gc tries to make it GARBAGE. It cannot do so and just skips it. The problem here is the fact, that gc says, that everything is all right. It updates bucket_generation_collected and never starts garbage collecting again:

vshard/vshard/storage/init.lua

Lines 2478 to 2503 in b3c27b3

if bucket_generation_collected ~= bucket_generation_current then
service:set_activity('gc garbage')
status, err = gc_bucket_drop(consts.BUCKET.GARBAGE, route_map)
if status then
service:set_activity('gc sent')
status, is_done, err = gc_bucket_process_sent()
end
if not status then
box.rollback()
log.error(service:set_status_error(
'Error during garbage collection step: %s', err))
elseif is_done then
-- Don't use global generation. During the collection it could
-- already change. Instead, remember the generation known before
-- the collection has started.
-- Since the collection also changes the generation, it makes
-- the GC happen always at least twice. But typically on the
-- second iteration it should not find any buckets to collect,
-- and then the collected generation matches the global one.
bucket_generation_collected = bucket_generation_current
else
-- Needs a retry. Can happen if not all SENT buckets were able
-- to turn into GARBAGE.
status = false
end
else

So, here's the patch, which stabilizes the test:

diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
index 5f34410..72aa8bd 100644
--- a/vshard/storage/init.lua
+++ b/vshard/storage/init.lua
@@ -1790,6 +1790,9 @@ local function bucket_test_gc(bids)
     local _bucket = box.space._bucket
     local ref, bucket, status
     for i, bid in ipairs(bids) do
+        if M.rebalancer_transfering_buckets[bid] then
+            goto not_ok_bid
+        end
         bucket = _bucket:get(bid)
         status = bucket ~= nil and bucket.status or consts.BUCKET.GARBAGE
         if status ~= consts.BUCKET.GARBAGE and status ~= consts.BUCKET.SENT then
@@ -2393,13 +2396,19 @@ local function gc_bucket_process_sent_xc()
     local batch = table.new(limit, 0)
     local i = 0
     local is_done = true
+    local ref
     for _, b in _bucket.index.status:pairs(consts.BUCKET.SENT) do
         i = i + 1
         local bid = b.id
-        local ref = M.bucket_refs[bid]
+        if M.rebalancer_transfering_buckets[bid] then
+            is_done = false
+            goto continue
+        end
+        ref = M.bucket_refs[bid]
         if ref ~= nil then
             assert(ref.rw == 0)
             if ref.ro ~= 0 then
+                is_done = false
                 goto continue
             end
         end

But it looks like storage/scheduler hangs with this patch. It also happens with initial patch, you proposed. Can you reproduce this hang?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the help! I found the reason, created a ticket for it because it is clearly not related to this PR: #433.

Also I probably will pause this PR. It might happen that we won't need it. I realized some things about how rebalancer will have to exist with master='auto' on storages, which might render this PR useless.

local new_cfg_template = table.deepcopy(cfg_template)
new_cfg_template.rebalancer_mode = 'manual'

-- Try a master.
new_cfg_template.sharding[1].replicas.replica_1_a.rebalancer = true
local new_cfg = vtest.config_new(new_cfg_template)
vtest.cluster_cfg(g, new_cfg)
t.assert_equals(find_rebalancer_name(g), 'replica_1_a')

-- Try a replica.
new_cfg_template.sharding[1].replicas.replica_1_a.rebalancer = nil
new_cfg_template.sharding[2].replicas.replica_2_b.rebalancer = true
new_cfg = vtest.config_new(new_cfg_template)
vtest.cluster_cfg(g, new_cfg)
t.assert_equals(find_rebalancer_name(g), 'replica_2_b')

-- Ensure it works even on the replica.
vtest.cluster_rebalancer_disable(g)
g.replica_1_a:exec(function(rs2_uuid)
while true do
local bid = _G.get_first_bucket()
if not bid then
break
end
local _, err = ivshard.storage.bucket_send(
bid, rs2_uuid, {timeout = iwait_timeout})
ilt.assert_equals(err, nil)
end
end, {g.replica_2_a:replicaset_uuid()})
vtest.cluster_rebalancer_enable(g)

g.replica_2_b:exec(function()
ivtest.service_wait_for_new_ok(
ivshard.storage.internal.rebalancer_service, {
on_yield = ivshard.storage.rebalancer_wakeup
})
end)
local _, err = vtest.cluster_exec_each_master(g, function(bucket_count)
_G.bucket_gc_wait()
ilt.assert_equals(ivshard.storage.buckets_count(), bucket_count)
end, {new_cfg_template.bucket_count / #new_cfg_template.sharding})
t.assert_equals(err, nil)

-- Revert.
vtest.cluster_cfg(g, global_cfg)
end
46 changes: 46 additions & 0 deletions test/unit-luatest/config_test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,49 @@ g.test_replica_listen = function()
t.assert_equals(rep_1_a.uri, url1, 'uri value')
t.assert_equals(rep_1_a.listen, {url1, url2}, 'listen value')
end

g.test_rebalancer_mode = function()
local config = {
sharding = {
storage_1_uuid = {
replicas = {
storage_1_a_uuid = {
uri = 'storage:storage@127.0.0.1:3301',
name = 'storage_1_a',
},
},
},
storage_2_uuid = {
replicas = {
storage_2_a_uuid = {
uri = 'storage:storage@127.0.0.1:3302',
name = 'storage_2_a',
},
}
}
},
rebalancer_mode = 'off',
}
t.assert(vcfg.check(config), 'mode = off')
local sharding = config.sharding
local storage_1_a = sharding.storage_1_uuid.replicas.storage_1_a_uuid
storage_1_a.rebalancer = true
t.assert(vcfg.check(config), 'mode = off with one marked')
storage_1_a.rebalancer = nil

config.rebalancer_mode = 'auto'
t.assert(vcfg.check(config), 'mode = auto')
storage_1_a.rebalancer = true
t.assert(vcfg.check(config), 'mode = auto with one marked')
storage_1_a.rebalancer = nil

config.rebalancer_mode = 'manual'
t.assert(vcfg.check(config), 'mode = manual')
storage_1_a.rebalancer = true
t.assert(vcfg.check(config), 'mode = manual with one marked')

local storage_2_a = sharding.storage_2_uuid.replicas.storage_2_a_uuid
storage_2_a.rebalancer = true
t.assert_error_msg_contains('More than one rebalancer is found',
vcfg.check, config)
end
4 changes: 2 additions & 2 deletions test/unit/config.result
Original file line number Diff line number Diff line change
Expand Up @@ -462,9 +462,9 @@ lcfg.check(cfg)['sharding']
- replicaset_uuid:
replicas:
replica_uuid:
master: true
uri: 127.0.0.1
name: storage
master: true
weight: 100000
...
replica.uri = 'user:password@localhost'
Expand All @@ -475,9 +475,9 @@ lcfg.check(cfg)['sharding']
- replicaset_uuid:
replicas:
replica_uuid:
master: true
uri: user:password@localhost
name: storage
master: true
weight: 100000
...
replica.url = old_uri
Expand Down
20 changes: 20 additions & 0 deletions vshard/cfg.lua
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ local function check_replicaset_master(master)
end
end

local function cfg_check_rebalancer_mode(mode)
if mode ~= 'auto' and mode ~= 'manual' and mode ~= 'off' then
error('Only "auto", "manual", and "off" are supported')
end
end

local function is_number(v)
return type(v) == 'number' and v == v
end
Expand Down Expand Up @@ -148,6 +154,7 @@ local replica_template = {
type = 'boolean', name = "Master", is_optional = true,
check = check_replica_master
},
rebalancer = {type = 'boolean', name = 'Rebalancer', is_optional = true},
}

local function check_replicas(replicas)
Expand Down Expand Up @@ -209,6 +216,7 @@ local function check_sharding(sharding)
local uuids = {}
local uris = {}
local names = {}
local rebalancer_uuid
local is_all_weights_zero = true
for replicaset_uuid, replicaset in pairs(sharding) do
if uuids[replicaset_uuid] then
Expand Down Expand Up @@ -252,6 +260,14 @@ local function check_sharding(sharding)
names[name] = 2
end
end
if replica.rebalancer then
Serpentian marked this conversation as resolved.
Show resolved Hide resolved
if rebalancer_uuid then
error(string.format('More than one rebalancer is found: '..
'on replicas %s and %s', replica_uuid,
rebalancer_uuid))
end
rebalancer_uuid = replica_uuid
end
end
is_all_weights_zero = is_all_weights_zero and replicaset.weight == 0
end
Expand All @@ -278,6 +294,10 @@ local cfg_template = {
type = 'positive integer', name = 'Bucket count', is_optional = true,
default = consts.DEFAULT_BUCKET_COUNT
},
rebalancer_mode = {
type = 'string', name = 'Rebalancer mode', is_optional = true,
default = 'auto', check = cfg_check_rebalancer_mode,
},
rebalancer_disbalance_threshold = {
type = 'non-negative number', name = 'Rebalancer disbalance threshold',
is_optional = true,
Expand Down
41 changes: 34 additions & 7 deletions vshard/storage/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3315,6 +3315,32 @@ end
-- Configuration
--------------------------------------------------------------------------------

local function storage_cfg_find_rebalancer(cfg)
local mode = cfg.rebalancer_mode
if mode == 'off' then
return
end
local min_master_uuid
local rebalancer_uuid
for _, rs in pairs(cfg.sharding) do
for replica_uuid, replica in pairs(rs.replicas) do
if replica.rebalancer then
assert(not rebalancer_uuid)
rebalancer_uuid = replica_uuid
end
if replica.master and (not min_master_uuid or
replica_uuid < min_master_uuid) then
min_master_uuid = replica_uuid
end
end
end
if mode == 'auto' then
return min_master_uuid
end
assert(mode == 'manual')
return rebalancer_uuid
end

local function storage_cfg(cfg, this_replica_uuid, is_reload)
if this_replica_uuid == nil then
error('Usage: cfg(configuration, this_replica_uuid)')
Expand All @@ -3337,13 +3363,8 @@ local function storage_cfg(cfg, this_replica_uuid, is_reload)
local this_replicaset
local this_replica
local new_replicasets = lreplicaset.buildall(vshard_cfg)
local min_master
for _, rs in pairs(new_replicasets) do
for replica_uuid, replica in pairs(rs.replicas) do
if (min_master == nil or replica_uuid < min_master.uuid) and
rs.master == replica then
min_master = replica
end
if replica_uuid == this_replica_uuid then
assert(this_replicaset == nil)
this_replicaset = rs
Expand Down Expand Up @@ -3481,7 +3502,8 @@ local function storage_cfg(cfg, this_replica_uuid, is_reload)
local_on_master_enable()
end

if min_master == this_replica then
local rebalancer_uuid = storage_cfg_find_rebalancer(vshard_cfg)
if rebalancer_uuid == this_replica.uuid then
if not M.rebalancer_fiber then
M.rebalancer_fiber =
util.reloadable_fiber_create('vshard.rebalancer', M,
Expand All @@ -3492,7 +3514,12 @@ local function storage_cfg(cfg, this_replica_uuid, is_reload)
M.rebalancer_fiber:wakeup()
end
elseif M.rebalancer_fiber then
log.info('Rebalancer location has changed to %s', min_master)
if rebalancer_uuid then
log.info('Rebalancer location has changed to instance %s',
rebalancer_uuid)
else
log.info('Rebalancer is turned off')
end
M.rebalancer_fiber:cancel()
M.rebalancer_fiber = nil
end
Expand Down
Loading