Skip to content

Commit

Permalink
router: calls affect temporary prioritized replica
Browse files Browse the repository at this point in the history
Previously prioritized replica was changed only if it was disconnected
for FAILOVER_DOWN_TIMEOUT seconds. However, if connection is shows as
'connected' it doesn't mean, that this connection actually works. The
connection must be pingable in order to be operational.

This commit makes failover temporary lower replica's priority if
FAILOVER_DOWN_SEQUENTIAL_FAIL requests fail to it. All vshard internal
requests (including failover ping) and all user calls affect the number
of sequentially failed requests. Note, that we consider request
failed, when net.box connection is not operational (cannot make
conn.call, e.g. connection is not yet established or timeout is
reached), user functions throwing errors won't affect prioritized
replica.

The behavior of failover is the following after this commit:

1. Failover pings all prioritized replicas. If ping doesn't succeed, the
   connection is recreated, which is needed, if user returns too big
   values from the functions, in such case no other request can be done
   until this value is returned. Failed ping affects the number of
   sequentially failed requests.

2. If connection is down for >= than FAILOVER_DOWN_TIMEOUT or if the
   number of sequentially failed requests is >=
   FAILOVER_DOWN_SEQUENTIAL_FAIL, than we take replica with lower
   priority as the main one.

3. If failover didn't try to use the more prioritized replica (according
   to weights) for more than FAILOVER_UP_TIMEOUT, then we try to set a
   new replica as the prioritized one. Note, that we don't set it, if
   ping to it didn't succeed during ping round in (1).

Closes #483

NO_DOC=bugfix
  • Loading branch information
Serpentian committed Aug 16, 2024
1 parent 8b7524c commit 9fc976d
Show file tree
Hide file tree
Showing 11 changed files with 369 additions and 49 deletions.
1 change: 1 addition & 0 deletions test/instances/router.lua
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ _G.ilt = require('luatest')
_G.imsgpack = require('msgpack')
_G.ivtest = require('test.luatest_helpers.vtest')
_G.ivconst = require('vshard.consts')
_G.iverror = require('vshard.error')
_G.iwait_timeout = _G.ivtest.wait_timeout

-- Do not load entire vshard into the global namespace to catch errors when code
Expand Down
243 changes: 243 additions & 0 deletions test/router-luatest/router_test.lua
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
local t = require('luatest')
local vtest = require('test.luatest_helpers.vtest')
local vutil = require('vshard.util')
local vconsts = require('vshard.consts')

local g = t.group('router')
local cfg_template = {
Expand Down Expand Up @@ -861,3 +862,245 @@ g.test_request_timeout = function(g)
_G.sleep_num = nil
end)
end

local function prepare_affect_priority_rs(g)
local new_cfg_template = table.deepcopy(cfg_template)
new_cfg_template.sharding[1].replicas.replica_1_a.zone = 3
new_cfg_template.sharding[1].replicas.replica_1_b.zone = 2
new_cfg_template.zone = 1
new_cfg_template.weights = {
[1] = {
[1] = 0,
[2] = 1,
[3] = 2,
},
}
-- So that ping timeout is always > replica.net_timeout.
-- net_timeout starts with CALL_TIMEOUT_MIN and is mutiplied by 2 if number
-- of failed requests is >= 2.
new_cfg_template.failover_ping_timeout = vconsts.CALL_TIMEOUT_MIN * 4
local new_cluster_cfg = vtest.config_new(new_cfg_template)
vtest.router_cfg(g.router, new_cluster_cfg)
end

local function affect_priority_clear_net_timeout(g)
g.router:exec(function()
-- Reset net_timeout, so that it doesn't affect the test. This is
-- needed as we use the absolute minimum failover_ping_timeout for
-- FAILOVER_DOWN_SEQUENTIAL_FAIL = 3. 10 successful calls are needed
-- to restore it to CALL_TIMEOUT_MIN wthout reset.
local router = ivshard.router.internal.static_router
for _, rs in pairs(router.replicasets) do
for _, r in pairs(rs.replicas) do
r.net_timeout = ivconst.CALL_TIMEOUT_MIN
end
end
end)
end

--
-- gh-483: failover ping temporary lower replica's priority, when it cannot be
-- reached several times in a row:
--
-- 1. replica_1_b is the prioritized one. replica_1_a is the second one.
-- 2. router establishes connection to all instances, failover sets prioritized
-- replica_1_b.
-- 3. Node breaks and stops to respond.
-- 4. Failover retries ping FAILOVER_DOWN_SEQUENTIAL_FAIL times and changes
-- prioritized replica to the lower one. Note, that connection is recreated
-- on every failed ping.
-- 5. Every FAILOVER_UP_TIMEOUT failover checks, if any replica with higher
-- priority can be reached and changes the prioritized replica if it's so.
--
g.test_failover_ping_affects_priority = function()
prepare_affect_priority_rs(g)

-- Find prioritized replica and disable failover for now.
g.router:exec(function(rs_uuid, replica_uuid)
local router = ivshard.router.internal.static_router
local rs = router.replicasets[rs_uuid]
local opts = {timeout = iwait_timeout}
rs:wait_connected_all(opts)

t.helpers.retrying(opts, function()
router.failover_fiber:wakeup()
t.assert_equals(rs.replica.uuid, replica_uuid,
'Prioritized replica have not been set yet')
end)

local errinj = ivshard.router.internal.errinj
errinj.ERRINJ_FAILOVER_DELAY = true
t.helpers.retrying(opts, function()
router.failover_fiber:wakeup()
t.assert_equals(errinj.ERRINJ_FAILOVER_DELAY, 'in',
'Failover have not been stopped yet')
end)
end, {g.replica_1_b:replicaset_uuid(), g.replica_1_b:instance_uuid()})

-- Break 'info' request on replica so that it fails with TimedOut error.
g.replica_1_b:exec(function()
rawset(_G, 'old_call', ivshard.storage._call)
ivshard.storage._call = function(service_name, ...)
if service_name == 'info' then
ifiber.sleep(ivconst.CALL_TIMEOUT_MIN * 5)
end
return _G.old_call(service_name, ...)
end
end)

affect_priority_clear_net_timeout(g)
g.router:exec(function(rs_uuid, master_uuid)
local router = ivshard.router.internal.static_router
local rs = router.replicasets[rs_uuid]

-- And we change the prioritized replica.
ivshard.router.internal.errinj.ERRINJ_FAILOVER_DELAY = false
t.helpers.retrying({timeout = iwait_timeout}, function()
router.failover_fiber:wakeup()
t.assert_equals(rs.replica.uuid, master_uuid)
end)

-- Check, that prioritized replica is not changed, as it's still broken.
rawset(_G, 'old_up_timeout', ivconst.FAILOVER_UP_TIMEOUT)
ivconst.FAILOVER_UP_TIMEOUT = 0.01
ivtest.service_wait_for_new_ok(router.failover_service,
{on_yield = router.failover_fiber:wakeup()})
t.assert_equals(rs.replica.uuid, master_uuid)
end, {g.replica_1_b:replicaset_uuid(), g.replica_1_a:instance_uuid()})

-- Restore 'info' request.
g.replica_1_b:exec(function()
ivshard.storage._call = _G.old_call
_G.old_call = nil
end)

-- As replica_1_b has higher priority, it should be restored automatically.
g.router:exec(function(rs_uuid, replica_uuid)
local router = ivshard.router.internal.static_router
local rs = router.replicasets[rs_uuid]
t.assert_equals(rs.priority_list[1].uuid, replica_uuid)
t.helpers.retrying({timeout = iwait_timeout}, function()
router.failover_fiber:wakeup()
t.assert_equals(rs.replica.uuid, replica_uuid,
'Prioritized replica is not up yet')
end)
end, {g.replica_1_b:replicaset_uuid(), g.replica_1_b:instance_uuid()})

vtest.router_cfg(g.router, global_cfg)
g.router:exec(function()
ivconst.FAILOVER_UP_TIMEOUT = _G.old_up_timeout
_G.old_up_timeout = nil
end)
end

--
-- gh-483: user calls also affects priority. If several sequential requests
-- fail, then the same logic as in the previous test happens.
--
g.test_failed_calls_affect_priority = function()
prepare_affect_priority_rs(g)
local timeout = vconsts.CALL_TIMEOUT_MIN * 4

-- Find prioritized replica and disable failover for now.
g.router:exec(function(rs_uuid, replica_uuid)
local router = ivshard.router.internal.static_router
local rs = router.replicasets[rs_uuid]
local opts = {timeout = iwait_timeout}
rs:wait_connected_all(opts)

t.helpers.retrying(opts, function()
router.failover_fiber:wakeup()
t.assert_equals(rs.replica.uuid, replica_uuid,
'Prioritized replica have not been set yet')
end)

local errinj = ivshard.router.internal.errinj
errinj.ERRINJ_FAILOVER_DELAY = true
t.helpers.retrying(opts, function()
router.failover_fiber:wakeup()
t.assert_equals(errinj.ERRINJ_FAILOVER_DELAY, 'in',
'Failover have not been stopped yet')
end)

-- Discovery is disabled, as it may affect `net_sequential_fail`
-- and leads to flakiness of the test.
errinj.ERRINJ_LONG_DISCOVERY = true
t.helpers.retrying(opts, function()
router.discovery_fiber:wakeup()
t.assert_equals(errinj.ERRINJ_LONG_DISCOVERY, 'waiting',
'Discovery have not been stopped yet')
end)
end, {g.replica_1_b:replicaset_uuid(), g.replica_1_b:instance_uuid()})

-- Break 'info' request on replica so that it fails with TimedOut error.
-- No other request can be broken, as only failover changes priority and
-- as soon as it wakes up it succeeds with `_call` and sets
-- `net_sequential_fail` to 0.
g.replica_1_b:exec(function()
rawset(_G, 'old_call', ivshard.storage._call)
ivshard.storage._call = function(service_name, ...)
if service_name == 'info' then
ifiber.sleep(ivconst.CALL_TIMEOUT_MIN * 5)
end
return _G.old_call(service_name, ...)
end
end)

affect_priority_clear_net_timeout(g)
local bid = vtest.storage_first_bucket(g.replica_1_a)
g.router:exec(function(bid, timeout, rs_uuid, replica_uuid)
local router = ivshard.router.internal.static_router
local replica = router.replicasets[rs_uuid].replica
t.assert_equals(replica.uuid, replica_uuid)

local fails = replica.net_sequential_fail
for _ = 1, ivconst.FAILOVER_DOWN_SEQUENTIAL_FAIL do
local ok, err = ivshard.router.callro(bid, 'vshard.storage._call',
{'info'}, {timeout = timeout})
t.assert_not(ok)
t.assert(iverror.is_timeout(err))
end

t.assert_equals(replica.net_sequential_fail,
fails + ivconst.FAILOVER_DOWN_SEQUENTIAL_FAIL)

-- Priority is changed only by failover. So, the prioritized replica
-- is still the failing one.
t.assert_equals(router.replicasets[rs_uuid].replica.uuid, replica_uuid)
end, {bid, timeout, g.replica_1_b:replicaset_uuid(),
g.replica_1_b:instance_uuid()})

-- Enable failover, which changes priority of the replica.
g.router:exec(function(rs_uuid, master_uuid)
local router = ivshard.router.internal.static_router
ivshard.router.internal.errinj.ERRINJ_FAILOVER_DELAY = false
t.helpers.retrying({timeout = iwait_timeout}, function()
router.failover_fiber:wakeup()
t.assert_equals(router.replicasets[rs_uuid].replica.uuid,
master_uuid, 'Master is not prioritized yet')
end)
end, {g.replica_1_b:replicaset_uuid(), g.replica_1_a:instance_uuid()})

-- Restore 'info' request.
g.replica_1_b:exec(function()
ivshard.storage._call = _G.old_call
_G.old_call = nil
end)

-- As replica_1_b has higher priority, it should be restored automatically.
g.router:exec(function(rs_uuid, replica_uuid)
local old_up_timeout = ivconst.FAILOVER_UP_TIMEOUT
ivconst.FAILOVER_UP_TIMEOUT = 1
local router = ivshard.router.internal.static_router
local rs = router.replicasets[rs_uuid]
t.assert_equals(rs.priority_list[1].uuid, replica_uuid)
t.helpers.retrying({timeout = iwait_timeout}, function()
router.failover_fiber:wakeup()
t.assert_equals(rs.replica.uuid, replica_uuid,
'Prioritized replica is not up yet')
end)
ivconst.FAILOVER_UP_TIMEOUT = old_up_timeout
end, {g.replica_1_b:replicaset_uuid(), g.replica_1_b:instance_uuid()})

vtest.router_cfg(g.router, global_cfg)
end
7 changes: 4 additions & 3 deletions test/router/exponential_timeout.result
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ util.map_evals(test_run, {REPLICASET_1, REPLICASET_2}, 'bootstrap_storage(\'memt
_ = test_run:cmd("create server router_1 with script='router/router_1.lua'")
---
...
-- Discovery algorithm changes sometimes and should not affect the
-- Discovery algorithm and failover changes sometimes and should not affect the
-- exponential timeout test.
_ = test_run:cmd("start server router_1 with args='discovery_disable'")
_ = test_run:cmd("start server router_1 with " .. \
"args='discovery_disable failover_disable'")
---
...
_ = test_run:switch('router_1')
Expand Down Expand Up @@ -103,7 +104,7 @@ util.collect_timeouts(rs1)
- - fail: 0
ok: 0
timeout: 0.5
- fail: 1
- fail: 2
ok: 0
timeout: 1
...
Expand Down
5 changes: 3 additions & 2 deletions test/router/exponential_timeout.test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ util.wait_master(test_run, REPLICASET_1, 'storage_1_a')
util.wait_master(test_run, REPLICASET_2, 'storage_2_a')
util.map_evals(test_run, {REPLICASET_1, REPLICASET_2}, 'bootstrap_storage(\'memtx\')')
_ = test_run:cmd("create server router_1 with script='router/router_1.lua'")
-- Discovery algorithm changes sometimes and should not affect the
-- Discovery algorithm and failover changes sometimes and should not affect the
-- exponential timeout test.
_ = test_run:cmd("start server router_1 with args='discovery_disable'")
_ = test_run:cmd("start server router_1 with " .. \
"args='discovery_disable failover_disable'")
_ = test_run:switch('router_1')
util = require('util')

Expand Down
9 changes: 5 additions & 4 deletions test/router/retry_reads.result
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ util.map_evals(test_run, {REPLICASET_1, REPLICASET_2}, 'bootstrap_storage(\'memt
_ = test_run:cmd("create server router_1 with script='router/router_1.lua'")
---
...
-- Discovery algorithm changes sometimes and should not affect the
-- read retry decisions.
_ = test_run:cmd("start server router_1 with args='discovery_disable'")
-- Discovery algorithm and failover changes sometimes and should not affect the
-- exponential timeout test.
_ = test_run:cmd("start server router_1 with " .. \
"args='discovery_disable failover_disable'")
---
...
_ = test_run:switch('router_1')
Expand Down Expand Up @@ -69,7 +70,7 @@ util.collect_timeouts(rs1)
- - fail: 0
ok: 0
timeout: 0.5
- fail: 1
- fail: 2
ok: 0
timeout: 1
...
Expand Down
7 changes: 4 additions & 3 deletions test/router/retry_reads.test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ util.wait_master(test_run, REPLICASET_1, 'storage_1_a')
util.wait_master(test_run, REPLICASET_2, 'storage_2_a')
util.map_evals(test_run, {REPLICASET_1, REPLICASET_2}, 'bootstrap_storage(\'memtx\')')
_ = test_run:cmd("create server router_1 with script='router/router_1.lua'")
-- Discovery algorithm changes sometimes and should not affect the
-- read retry decisions.
_ = test_run:cmd("start server router_1 with args='discovery_disable'")
-- Discovery algorithm and failover changes sometimes and should not affect the
-- exponential timeout test.
_ = test_run:cmd("start server router_1 with " .. \
"args='discovery_disable failover_disable'")
_ = test_run:switch('router_1')
util = require('util')

Expand Down
3 changes: 2 additions & 1 deletion test/router/router.result
Original file line number Diff line number Diff line change
Expand Up @@ -1527,7 +1527,8 @@ table.sort(error_messages)
...
error_messages
---
- - Use replica:check_is_connected(...) instead of replica.check_is_connected(...)
- - Use replica:call(...) instead of replica.call(...)
- Use replica:check_is_connected(...) instead of replica.check_is_connected(...)
- Use replica:detach_conn(...) instead of replica.detach_conn(...)
- Use replica:is_connected(...) instead of replica.is_connected(...)
- Use replica:safe_uri(...) instead of replica.safe_uri(...)
Expand Down
1 change: 0 additions & 1 deletion test/router/router_1.lua

This file was deleted.

35 changes: 35 additions & 0 deletions test/router/router_1.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#!/usr/bin/env tarantool

require('strict').on()
fiber = require('fiber')

-- Check if we are running under test-run
if os.getenv('ADMIN') then
test_run = require('test_run').new()
require('console').listen(os.getenv('ADMIN'))
end

replicasets = {'cbf06940-0790-498b-948d-042b62cf3d29',
'ac522f65-aa94-4134-9f64-51ee384f1a54'}

-- Call a configuration provider
cfg = dofile('localcfg.lua')
if arg[1] == 'discovery_disable' then
cfg.discovery_mode = 'off'
end

-- Start the database with sharding
vshard = require('vshard')

if arg[2] == 'failover_disable' then
vshard.router.internal.errinj.ERRINJ_FAILOVER_DELAY = true
end

vshard.router.cfg(cfg)

if arg[2] == 'failover_disable' then
while vshard.router.internal.errinj.ERRINJ_FAILOVER_DELAY ~= 'in' do
router.failover_fiber:wakeup()
fiber.sleep(0.01)
end
end
1 change: 1 addition & 0 deletions vshard/consts.lua
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ return {
CALL_TIMEOUT_MAX = 64;
FAILOVER_UP_TIMEOUT = 5;
FAILOVER_DOWN_TIMEOUT = 1;
FAILOVER_DOWN_SEQUENTIAL_FAIL = 3;
DEFAULT_FAILOVER_PING_TIMEOUT = 5;
DEFAULT_SYNC_TIMEOUT = 1;
RECONNECT_TIMEOUT = 0.5;
Expand Down
Loading

0 comments on commit 9fc976d

Please sign in to comment.