diff --git a/test/instances/router.lua b/test/instances/router.lua index a9505e66..6614539e 100755 --- a/test/instances/router.lua +++ b/test/instances/router.lua @@ -11,6 +11,7 @@ _G.ilt = require('luatest') _G.imsgpack = require('msgpack') _G.ivtest = require('test.luatest_helpers.vtest') _G.ivconst = require('vshard.consts') +_G.iverror = require('vshard.error') _G.iwait_timeout = _G.ivtest.wait_timeout -- Do not load entire vshard into the global namespace to catch errors when code diff --git a/test/router-luatest/router_test.lua b/test/router-luatest/router_test.lua index 776c1337..b2e12951 100644 --- a/test/router-luatest/router_test.lua +++ b/test/router-luatest/router_test.lua @@ -1,6 +1,7 @@ local t = require('luatest') local vtest = require('test.luatest_helpers.vtest') local vutil = require('vshard.util') +local vconsts = require('vshard.consts') local g = t.group('router') local cfg_template = { @@ -861,3 +862,245 @@ g.test_request_timeout = function(g) _G.sleep_num = nil end) end + +local function prepare_affect_priority_rs(g) + local new_cfg_template = table.deepcopy(cfg_template) + new_cfg_template.sharding[1].replicas.replica_1_a.zone = 3 + new_cfg_template.sharding[1].replicas.replica_1_b.zone = 2 + new_cfg_template.zone = 1 + new_cfg_template.weights = { + [1] = { + [1] = 0, + [2] = 1, + [3] = 2, + }, + } + -- So that ping timeout is always > replica.net_timeout. + -- net_timeout starts with CALL_TIMEOUT_MIN and is mutiplied by 2 if number + -- of failed requests is >= 2. + new_cfg_template.failover_ping_timeout = vconsts.CALL_TIMEOUT_MIN * 4 + local new_cluster_cfg = vtest.config_new(new_cfg_template) + vtest.router_cfg(g.router, new_cluster_cfg) +end + +local function affect_priority_clear_net_timeout(g) + g.router:exec(function() + -- Reset net_timeout, so that it doesn't affect the test. This is + -- needed as we use the absolute minimum failover_ping_timeout for + -- FAILOVER_DOWN_SEQUENTIAL_FAIL = 3. 10 successful calls are needed + -- to restore it to CALL_TIMEOUT_MIN wthout reset. + local router = ivshard.router.internal.static_router + for _, rs in pairs(router.replicasets) do + for _, r in pairs(rs.replicas) do + r.net_timeout = ivconst.CALL_TIMEOUT_MIN + end + end + end) +end + +-- +-- gh-483: failover ping temporary lower replica's priority, when it cannot be +-- reached several times in a row: +-- +-- 1. replica_1_b is the prioritized one. replica_1_a is the second one. +-- 2. router establishes connection to all instances, failover sets prioritized +-- replica_1_b. +-- 3. Node breaks and stops to respond. +-- 4. Failover retries ping FAILOVER_DOWN_SEQUENTIAL_FAIL times and changes +-- prioritized replica to the lower one. Note, that connection is recreated +-- on every failed ping. +-- 5. Every FAILOVER_UP_TIMEOUT failover checks, if any replica with higher +-- priority can be reached and changes the prioritized replica if it's so. +-- +g.test_failover_ping_affects_priority = function() + prepare_affect_priority_rs(g) + + -- Find prioritized replica and disable failover for now. + g.router:exec(function(rs_uuid, replica_uuid) + local router = ivshard.router.internal.static_router + local rs = router.replicasets[rs_uuid] + local opts = {timeout = iwait_timeout} + rs:wait_connected_all(opts) + + t.helpers.retrying(opts, function() + router.failover_fiber:wakeup() + t.assert_equals(rs.replica.uuid, replica_uuid, + 'Prioritized replica have not been set yet') + end) + + local errinj = ivshard.router.internal.errinj + errinj.ERRINJ_FAILOVER_DELAY = true + t.helpers.retrying(opts, function() + router.failover_fiber:wakeup() + t.assert_equals(errinj.ERRINJ_FAILOVER_DELAY, 'in', + 'Failover have not been stopped yet') + end) + end, {g.replica_1_b:replicaset_uuid(), g.replica_1_b:instance_uuid()}) + + -- Break 'info' request on replica so that it fails with TimedOut error. + g.replica_1_b:exec(function() + rawset(_G, 'old_call', ivshard.storage._call) + ivshard.storage._call = function(service_name, ...) + if service_name == 'info' then + ifiber.sleep(ivconst.CALL_TIMEOUT_MIN * 5) + end + return _G.old_call(service_name, ...) + end + end) + + affect_priority_clear_net_timeout(g) + g.router:exec(function(rs_uuid, master_uuid) + local router = ivshard.router.internal.static_router + local rs = router.replicasets[rs_uuid] + + -- And we change the prioritized replica. + ivshard.router.internal.errinj.ERRINJ_FAILOVER_DELAY = false + t.helpers.retrying({timeout = iwait_timeout}, function() + router.failover_fiber:wakeup() + t.assert_equals(rs.replica.uuid, master_uuid) + end) + + -- Check, that prioritized replica is not changed, as it's still broken. + rawset(_G, 'old_up_timeout', ivconst.FAILOVER_UP_TIMEOUT) + ivconst.FAILOVER_UP_TIMEOUT = 0.01 + ivtest.service_wait_for_new_ok(router.failover_service, + {on_yield = router.failover_fiber:wakeup()}) + t.assert_equals(rs.replica.uuid, master_uuid) + end, {g.replica_1_b:replicaset_uuid(), g.replica_1_a:instance_uuid()}) + + -- Restore 'info' request. + g.replica_1_b:exec(function() + ivshard.storage._call = _G.old_call + _G.old_call = nil + end) + + -- As replica_1_b has higher priority, it should be restored automatically. + g.router:exec(function(rs_uuid, replica_uuid) + local router = ivshard.router.internal.static_router + local rs = router.replicasets[rs_uuid] + t.assert_equals(rs.priority_list[1].uuid, replica_uuid) + t.helpers.retrying({timeout = iwait_timeout}, function() + router.failover_fiber:wakeup() + t.assert_equals(rs.replica.uuid, replica_uuid, + 'Prioritized replica is not up yet') + end) + end, {g.replica_1_b:replicaset_uuid(), g.replica_1_b:instance_uuid()}) + + vtest.router_cfg(g.router, global_cfg) + g.router:exec(function() + ivconst.FAILOVER_UP_TIMEOUT = _G.old_up_timeout + _G.old_up_timeout = nil + end) +end + +-- +-- gh-483: user calls also affects priority. If several sequential requests +-- fail, then the same logic as in the previous test happens. +-- +g.test_failed_calls_affect_priority = function() + prepare_affect_priority_rs(g) + local timeout = vconsts.CALL_TIMEOUT_MIN * 4 + + -- Find prioritized replica and disable failover for now. + g.router:exec(function(rs_uuid, replica_uuid) + local router = ivshard.router.internal.static_router + local rs = router.replicasets[rs_uuid] + local opts = {timeout = iwait_timeout} + rs:wait_connected_all(opts) + + t.helpers.retrying(opts, function() + router.failover_fiber:wakeup() + t.assert_equals(rs.replica.uuid, replica_uuid, + 'Prioritized replica have not been set yet') + end) + + local errinj = ivshard.router.internal.errinj + errinj.ERRINJ_FAILOVER_DELAY = true + t.helpers.retrying(opts, function() + router.failover_fiber:wakeup() + t.assert_equals(errinj.ERRINJ_FAILOVER_DELAY, 'in', + 'Failover have not been stopped yet') + end) + + -- Discovery is disabled, as it may affect `net_sequential_fail` + -- and leads to flakiness of the test. + errinj.ERRINJ_LONG_DISCOVERY = true + t.helpers.retrying(opts, function() + router.discovery_fiber:wakeup() + t.assert_equals(errinj.ERRINJ_LONG_DISCOVERY, 'waiting', + 'Discovery have not been stopped yet') + end) + end, {g.replica_1_b:replicaset_uuid(), g.replica_1_b:instance_uuid()}) + + -- Break 'info' request on replica so that it fails with TimedOut error. + -- No other request can be broken, as only failover changes priority and + -- as soon as it wakes up it succeeds with `_call` and sets + -- `net_sequential_fail` to 0. + g.replica_1_b:exec(function() + rawset(_G, 'old_call', ivshard.storage._call) + ivshard.storage._call = function(service_name, ...) + if service_name == 'info' then + ifiber.sleep(ivconst.CALL_TIMEOUT_MIN * 5) + end + return _G.old_call(service_name, ...) + end + end) + + affect_priority_clear_net_timeout(g) + local bid = vtest.storage_first_bucket(g.replica_1_a) + g.router:exec(function(bid, timeout, rs_uuid, replica_uuid) + local router = ivshard.router.internal.static_router + local replica = router.replicasets[rs_uuid].replica + t.assert_equals(replica.uuid, replica_uuid) + + local fails = replica.net_sequential_fail + for _ = 1, ivconst.FAILOVER_DOWN_SEQUENTIAL_FAIL do + local ok, err = ivshard.router.callro(bid, 'vshard.storage._call', + {'info'}, {timeout = timeout}) + t.assert_not(ok) + t.assert(iverror.is_timeout(err)) + end + + t.assert_equals(replica.net_sequential_fail, + fails + ivconst.FAILOVER_DOWN_SEQUENTIAL_FAIL) + + -- Priority is changed only by failover. So, the prioritized replica + -- is still the failing one. + t.assert_equals(router.replicasets[rs_uuid].replica.uuid, replica_uuid) + end, {bid, timeout, g.replica_1_b:replicaset_uuid(), + g.replica_1_b:instance_uuid()}) + + -- Enable failover, which changes priority of the replica. + g.router:exec(function(rs_uuid, master_uuid) + local router = ivshard.router.internal.static_router + ivshard.router.internal.errinj.ERRINJ_FAILOVER_DELAY = false + t.helpers.retrying({timeout = iwait_timeout}, function() + router.failover_fiber:wakeup() + t.assert_equals(router.replicasets[rs_uuid].replica.uuid, + master_uuid, 'Master is not prioritized yet') + end) + end, {g.replica_1_b:replicaset_uuid(), g.replica_1_a:instance_uuid()}) + + -- Restore 'info' request. + g.replica_1_b:exec(function() + ivshard.storage._call = _G.old_call + _G.old_call = nil + end) + + -- As replica_1_b has higher priority, it should be restored automatically. + g.router:exec(function(rs_uuid, replica_uuid) + local old_up_timeout = ivconst.FAILOVER_UP_TIMEOUT + ivconst.FAILOVER_UP_TIMEOUT = 1 + local router = ivshard.router.internal.static_router + local rs = router.replicasets[rs_uuid] + t.assert_equals(rs.priority_list[1].uuid, replica_uuid) + t.helpers.retrying({timeout = iwait_timeout}, function() + router.failover_fiber:wakeup() + t.assert_equals(rs.replica.uuid, replica_uuid, + 'Prioritized replica is not up yet') + end) + ivconst.FAILOVER_UP_TIMEOUT = old_up_timeout + end, {g.replica_1_b:replicaset_uuid(), g.replica_1_b:instance_uuid()}) + + vtest.router_cfg(g.router, global_cfg) +end diff --git a/test/router/exponential_timeout.result b/test/router/exponential_timeout.result index 252b816c..bd62b739 100644 --- a/test/router/exponential_timeout.result +++ b/test/router/exponential_timeout.result @@ -28,9 +28,10 @@ util.map_evals(test_run, {REPLICASET_1, REPLICASET_2}, 'bootstrap_storage(\'memt _ = test_run:cmd("create server router_1 with script='router/router_1.lua'") --- ... --- Discovery algorithm changes sometimes and should not affect the +-- Discovery algorithm and failover changes sometimes and should not affect the -- exponential timeout test. -_ = test_run:cmd("start server router_1 with args='discovery_disable'") +_ = test_run:cmd("start server router_1 with " .. \ + "args='discovery_disable failover_disable'") --- ... _ = test_run:switch('router_1') @@ -103,7 +104,7 @@ util.collect_timeouts(rs1) - - fail: 0 ok: 0 timeout: 0.5 - - fail: 1 + - fail: 2 ok: 0 timeout: 1 ... diff --git a/test/router/exponential_timeout.test.lua b/test/router/exponential_timeout.test.lua index 881b9a7e..ab677d60 100644 --- a/test/router/exponential_timeout.test.lua +++ b/test/router/exponential_timeout.test.lua @@ -10,9 +10,10 @@ util.wait_master(test_run, REPLICASET_1, 'storage_1_a') util.wait_master(test_run, REPLICASET_2, 'storage_2_a') util.map_evals(test_run, {REPLICASET_1, REPLICASET_2}, 'bootstrap_storage(\'memtx\')') _ = test_run:cmd("create server router_1 with script='router/router_1.lua'") --- Discovery algorithm changes sometimes and should not affect the +-- Discovery algorithm and failover changes sometimes and should not affect the -- exponential timeout test. -_ = test_run:cmd("start server router_1 with args='discovery_disable'") +_ = test_run:cmd("start server router_1 with " .. \ + "args='discovery_disable failover_disable'") _ = test_run:switch('router_1') util = require('util') diff --git a/test/router/retry_reads.result b/test/router/retry_reads.result index 80834750..e057422f 100644 --- a/test/router/retry_reads.result +++ b/test/router/retry_reads.result @@ -28,9 +28,10 @@ util.map_evals(test_run, {REPLICASET_1, REPLICASET_2}, 'bootstrap_storage(\'memt _ = test_run:cmd("create server router_1 with script='router/router_1.lua'") --- ... --- Discovery algorithm changes sometimes and should not affect the --- read retry decisions. -_ = test_run:cmd("start server router_1 with args='discovery_disable'") +-- Discovery algorithm and failover changes sometimes and should not affect the +-- exponential timeout test. +_ = test_run:cmd("start server router_1 with " .. \ + "args='discovery_disable failover_disable'") --- ... _ = test_run:switch('router_1') @@ -69,7 +70,7 @@ util.collect_timeouts(rs1) - - fail: 0 ok: 0 timeout: 0.5 - - fail: 1 + - fail: 2 ok: 0 timeout: 1 ... diff --git a/test/router/retry_reads.test.lua b/test/router/retry_reads.test.lua index f0a44457..a9554d77 100644 --- a/test/router/retry_reads.test.lua +++ b/test/router/retry_reads.test.lua @@ -10,9 +10,10 @@ util.wait_master(test_run, REPLICASET_1, 'storage_1_a') util.wait_master(test_run, REPLICASET_2, 'storage_2_a') util.map_evals(test_run, {REPLICASET_1, REPLICASET_2}, 'bootstrap_storage(\'memtx\')') _ = test_run:cmd("create server router_1 with script='router/router_1.lua'") --- Discovery algorithm changes sometimes and should not affect the --- read retry decisions. -_ = test_run:cmd("start server router_1 with args='discovery_disable'") +-- Discovery algorithm and failover changes sometimes and should not affect the +-- exponential timeout test. +_ = test_run:cmd("start server router_1 with " .. \ + "args='discovery_disable failover_disable'") _ = test_run:switch('router_1') util = require('util') diff --git a/test/router/router.result b/test/router/router.result index 4389367f..8f678564 100644 --- a/test/router/router.result +++ b/test/router/router.result @@ -1527,7 +1527,8 @@ table.sort(error_messages) ... error_messages --- -- - Use replica:check_is_connected(...) instead of replica.check_is_connected(...) +- - Use replica:call(...) instead of replica.call(...) + - Use replica:check_is_connected(...) instead of replica.check_is_connected(...) - Use replica:detach_conn(...) instead of replica.detach_conn(...) - Use replica:is_connected(...) instead of replica.is_connected(...) - Use replica:safe_uri(...) instead of replica.safe_uri(...) diff --git a/test/router/router_1.lua b/test/router/router_1.lua deleted file mode 120000 index da63b08a..00000000 --- a/test/router/router_1.lua +++ /dev/null @@ -1 +0,0 @@ -../../example/router_1.lua \ No newline at end of file diff --git a/test/router/router_1.lua b/test/router/router_1.lua new file mode 100755 index 00000000..887c43bc --- /dev/null +++ b/test/router/router_1.lua @@ -0,0 +1,35 @@ +#!/usr/bin/env tarantool + +require('strict').on() +fiber = require('fiber') + +-- Check if we are running under test-run +if os.getenv('ADMIN') then + test_run = require('test_run').new() + require('console').listen(os.getenv('ADMIN')) +end + +replicasets = {'cbf06940-0790-498b-948d-042b62cf3d29', + 'ac522f65-aa94-4134-9f64-51ee384f1a54'} + +-- Call a configuration provider +cfg = dofile('localcfg.lua') +if arg[1] == 'discovery_disable' then + cfg.discovery_mode = 'off' +end + +-- Start the database with sharding +vshard = require('vshard') + +if arg[2] == 'failover_disable' then + vshard.router.internal.errinj.ERRINJ_FAILOVER_DELAY = true +end + +vshard.router.cfg(cfg) + +if arg[2] == 'failover_disable' then + while vshard.router.internal.errinj.ERRINJ_FAILOVER_DELAY ~= 'in' do + router.failover_fiber:wakeup() + fiber.sleep(0.01) + end +end diff --git a/vshard/consts.lua b/vshard/consts.lua index 249d4915..f8bedc4b 100644 --- a/vshard/consts.lua +++ b/vshard/consts.lua @@ -49,6 +49,7 @@ return { CALL_TIMEOUT_MAX = 64; FAILOVER_UP_TIMEOUT = 5; FAILOVER_DOWN_TIMEOUT = 1; + FAILOVER_DOWN_SEQUENTIAL_FAIL = 3; DEFAULT_FAILOVER_PING_TIMEOUT = 5; DEFAULT_SYNC_TIMEOUT = 1; RECONNECT_TIMEOUT = 0.5; diff --git a/vshard/replicaset.lua b/vshard/replicaset.lua index d38ef49a..abdffe67 100644 --- a/vshard/replicaset.lua +++ b/vshard/replicaset.lua @@ -483,8 +483,10 @@ end -- local function replicaset_down_replica_priority(replicaset) local old_replica = replicaset.replica - assert(old_replica and old_replica.down_ts and - not old_replica:is_connected()) + assert(old_replica and ((old_replica.down_ts and + not old_replica:is_connected()) or + old_replica.net_sequential_fail >= + consts.FAILOVER_DOWN_SEQUENTIAL_FAIL)) local new_replica = old_replica.next_by_priority if new_replica then assert(new_replica ~= old_replica) @@ -511,7 +513,8 @@ local function replicaset_up_replica_priority(replicaset) -- Failed to up priority. return end - if replica:is_connected() then + if replica:is_connected() and replica.net_sequential_ok > 0 then + assert(replica.net_sequential_fail == 0) replicaset.replica = replica assert(not old_replica or old_replica.weight >= replicaset.replica.weight) @@ -527,15 +530,12 @@ end -- local function replica_on_failed_request(replica) replica.net_sequential_ok = 0 - local val = replica.net_sequential_fail + 1 - if val >= 2 then + replica.net_sequential_fail = replica.net_sequential_fail + 1 + if replica.net_sequential_fail >= 2 then local new_timeout = replica.net_timeout * 2 if new_timeout <= consts.CALL_TIMEOUT_MAX then replica.net_timeout = new_timeout end - replica.net_sequential_fail = 1 - else - replica.net_sequential_fail = val end end @@ -1268,6 +1268,7 @@ local replica_mt = { return util.uri_format(uri) end, detach_conn = replica_detach_conn, + call = replica_call, }, __tostring = function(replica) if replica.name then diff --git a/vshard/router/init.lua b/vshard/router/init.lua index 6ac80b79..cbd978ef 100644 --- a/vshard/router/init.lua +++ b/vshard/router/init.lua @@ -44,6 +44,7 @@ if not M then ERRINJ_CFG = false, ERRINJ_CFG_DELAY = false, ERRINJ_FAILOVER_CHANGE_CFG = false, + ERRINJ_FAILOVER_DELAY = false, ERRINJ_RELOAD = false, ERRINJ_LONG_DISCOVERY = false, ERRINJ_MASTER_SEARCH_DELAY = false, @@ -973,41 +974,26 @@ end -- Failover -------------------------------------------------------------------------------- -local function failover_ping_round(router) - for _, replicaset in pairs(router.replicasets) do - local replica = replicaset.replica - if replica ~= nil and replica.conn ~= nil and - replica.down_ts == nil then - if not replica.conn:ping({timeout = - router.failover_ping_timeout}) then - log.info('Ping error from %s: perhaps a connection is down', - replica) - -- Connection hangs. Recreate it to be able to - -- fail over to a replica next by priority. The - -- old connection is not closed in case if it just - -- processes too big response at this moment. Any - -- way it will be eventually garbage collected - -- and closed. - replica:detach_conn() - replicaset:connect_replica(replica) - end - end - end -end - -- -- Replicaset must fall its replica connection to lower priority, -- if the current one is down too long. -- local function failover_need_down_priority(replicaset, curr_ts) local r = replicaset.replica + if not r or not r.next_by_priority then + return false + end -- down_ts not nil does not mean that the replica is not -- connected. Probably it is connected and now fetches schema, -- or does authorization. Either case, it is healthy, no need -- to down the prio. - return r and r.down_ts and not r:is_connected() and - curr_ts - r.down_ts >= consts.FAILOVER_DOWN_TIMEOUT - and r.next_by_priority + local is_down_ts = r.down_ts and not r:is_connected() and + curr_ts - r.down_ts >= consts.FAILOVER_DOWN_TIMEOUT + -- If we failed several sequential requests to replica, then something + -- is wrong with it. Temporary lower its priority. + local is_sequential_fails = + r.net_sequential_fail >= consts.FAILOVER_DOWN_SEQUENTIAL_FAIL + return is_down_ts or is_sequential_fails end -- @@ -1035,6 +1021,49 @@ local function failover_collect_to_update(router) return id_to_update end +local function failover_ping(replica, opts) + return replica:call('vshard.storage._call', {'info'}, opts) +end + +local function failover_ping_round(router, curr_ts) + local opts = {timeout = router.failover_ping_timeout} + for _, replicaset in pairs(router.replicasets) do + local replica = replicaset.replica + if failover_need_up_priority(replicaset, curr_ts) then + -- When its time to increase priority in replicaset, all instances, + -- priority of which are higher than the current one,are pinged so + -- that we know, which connections are working properly. + for _, r in pairs(replicaset.priority_list) do + if r == replica then + break + end + if r.conn ~= nil and r.down_ts == nil then + -- We don't need return values, r.net_sequential_ok is + -- incremented on succcessful request. + failover_ping(r, opts) + end + end + end + if replica ~= nil and replica.conn ~= nil and + replica.down_ts == nil then + local net_status, _, err = failover_ping(replica, opts) + if not net_status then + log.info('Ping error from %s: perhaps a connection is down: %s', + replica, err) + -- Connection hangs. Recreate it to be able to + -- fail over to a replica next by priority. The + -- old connection is not closed in case if it just + -- processes too big response at this moment. Any + -- way it will be eventually garbage collected + -- and closed. + replica:detach_conn() + replicaset:connect_replica(replica) + end + end + end +end + + -- -- Detect not optimal or disconnected replicas. For not optimal -- try to update them to optimal, and down priority of @@ -1042,12 +1071,12 @@ end -- @retval true A replica of an replicaset has been changed. -- local function failover_step(router) - failover_ping_round(router) + local curr_ts = fiber_clock() + failover_ping_round(router, curr_ts) local id_to_update = failover_collect_to_update(router) if #id_to_update == 0 then return false end - local curr_ts = fiber_clock() local replica_is_changed = false for _, id in pairs(id_to_update) do local rs = router.replicasets[id] @@ -1105,6 +1134,12 @@ local function failover_service_f(router, service) -- each min_timeout seconds. local prev_was_ok = false while module_version == M.module_version do + if M.errinj.ERRINJ_FAILOVER_DELAY then + M.errinj.ERRINJ_FAILOVER_DELAY = 'in' + repeat + lfiber.sleep(0.001) + until not M.errinj.ERRINJ_FAILOVER_DELAY + end service:next_iter() service:set_activity('updating replicas') local ok, replica_is_changed = pcall(failover_step, router)