From b36e90990c999fa88eb0458d6eb10c6ac034d76e Mon Sep 17 00:00:00 2001 From: Nikita Zheleztsov Date: Fri, 16 Aug 2024 16:12:28 +0300 Subject: [PATCH] router: calls affect temporary prioritized replica Previously prioritized replica was changed only if it was disconnected for FAILOVER_DOWN_TIMEOUT seconds. However, if connection is shows as 'connected' it doesn't mean, that this connection actually works. The connection must be pingable in order to be operational. This commit makes failover temporary lower replica's priority if FAILOVER_DOWN_SEQUENTIAL_FAIL requests fail to it. All vshard internal requests (including failover ping) and all user calls affect the number of sequentially failed requests. Note, that we consider request failed, when net.box connection is not operational (cannot make conn.call, e.g. connection is not yet established or timeout is reached), user functions throwing errors won't affect prioritized replica. The behavior of failover is the following after this commit: 1. Failover pings all prioritized replicas. If ping doesn't succeed, the connection is recreated, which is needed, if user returns too big values from the functions, in such case no other request can be done until this value is returned. Failed ping affects the number of sequentially failed requests. 2. If connection is down for >= than FAILOVER_DOWN_TIMEOUT or if the number of sequentially failed requests is >= FAILOVER_DOWN_SEQUENTIAL_FAIL, than we take replica with lower priority as the main one. 3. If failover didn't try to use the more prioritized replica (according to weights) for more than FAILOVER_UP_TIMEOUT, then we try to set a new replica as the prioritized one. Note, that we don't set it, if ping to it didn't succeed during ping round in (1). Closes tarantool/vshard#483 NO_DOC=bugfix --- test/instances/router.lua | 1 + test/router-luatest/router_test.lua | 243 +++++++++++++++++++++++ test/router/exponential_timeout.result | 7 +- test/router/exponential_timeout.test.lua | 5 +- test/router/retry_reads.result | 9 +- test/router/retry_reads.test.lua | 7 +- test/router/router.result | 3 +- test/router/router_1.lua | 36 +++- vshard/consts.lua | 1 + vshard/replicaset.lua | 17 +- vshard/router/init.lua | 89 ++++++--- 11 files changed, 369 insertions(+), 49 deletions(-) mode change 120000 => 100755 test/router/router_1.lua diff --git a/test/instances/router.lua b/test/instances/router.lua index a9505e66..6614539e 100755 --- a/test/instances/router.lua +++ b/test/instances/router.lua @@ -11,6 +11,7 @@ _G.ilt = require('luatest') _G.imsgpack = require('msgpack') _G.ivtest = require('test.luatest_helpers.vtest') _G.ivconst = require('vshard.consts') +_G.iverror = require('vshard.error') _G.iwait_timeout = _G.ivtest.wait_timeout -- Do not load entire vshard into the global namespace to catch errors when code diff --git a/test/router-luatest/router_test.lua b/test/router-luatest/router_test.lua index 776c1337..b2e12951 100644 --- a/test/router-luatest/router_test.lua +++ b/test/router-luatest/router_test.lua @@ -1,6 +1,7 @@ local t = require('luatest') local vtest = require('test.luatest_helpers.vtest') local vutil = require('vshard.util') +local vconsts = require('vshard.consts') local g = t.group('router') local cfg_template = { @@ -861,3 +862,245 @@ g.test_request_timeout = function(g) _G.sleep_num = nil end) end + +local function prepare_affect_priority_rs(g) + local new_cfg_template = table.deepcopy(cfg_template) + new_cfg_template.sharding[1].replicas.replica_1_a.zone = 3 + new_cfg_template.sharding[1].replicas.replica_1_b.zone = 2 + new_cfg_template.zone = 1 + new_cfg_template.weights = { + [1] = { + [1] = 0, + [2] = 1, + [3] = 2, + }, + } + -- So that ping timeout is always > replica.net_timeout. + -- net_timeout starts with CALL_TIMEOUT_MIN and is mutiplied by 2 if number + -- of failed requests is >= 2. + new_cfg_template.failover_ping_timeout = vconsts.CALL_TIMEOUT_MIN * 4 + local new_cluster_cfg = vtest.config_new(new_cfg_template) + vtest.router_cfg(g.router, new_cluster_cfg) +end + +local function affect_priority_clear_net_timeout(g) + g.router:exec(function() + -- Reset net_timeout, so that it doesn't affect the test. This is + -- needed as we use the absolute minimum failover_ping_timeout for + -- FAILOVER_DOWN_SEQUENTIAL_FAIL = 3. 10 successful calls are needed + -- to restore it to CALL_TIMEOUT_MIN wthout reset. + local router = ivshard.router.internal.static_router + for _, rs in pairs(router.replicasets) do + for _, r in pairs(rs.replicas) do + r.net_timeout = ivconst.CALL_TIMEOUT_MIN + end + end + end) +end + +-- +-- gh-483: failover ping temporary lower replica's priority, when it cannot be +-- reached several times in a row: +-- +-- 1. replica_1_b is the prioritized one. replica_1_a is the second one. +-- 2. router establishes connection to all instances, failover sets prioritized +-- replica_1_b. +-- 3. Node breaks and stops to respond. +-- 4. Failover retries ping FAILOVER_DOWN_SEQUENTIAL_FAIL times and changes +-- prioritized replica to the lower one. Note, that connection is recreated +-- on every failed ping. +-- 5. Every FAILOVER_UP_TIMEOUT failover checks, if any replica with higher +-- priority can be reached and changes the prioritized replica if it's so. +-- +g.test_failover_ping_affects_priority = function() + prepare_affect_priority_rs(g) + + -- Find prioritized replica and disable failover for now. + g.router:exec(function(rs_uuid, replica_uuid) + local router = ivshard.router.internal.static_router + local rs = router.replicasets[rs_uuid] + local opts = {timeout = iwait_timeout} + rs:wait_connected_all(opts) + + t.helpers.retrying(opts, function() + router.failover_fiber:wakeup() + t.assert_equals(rs.replica.uuid, replica_uuid, + 'Prioritized replica have not been set yet') + end) + + local errinj = ivshard.router.internal.errinj + errinj.ERRINJ_FAILOVER_DELAY = true + t.helpers.retrying(opts, function() + router.failover_fiber:wakeup() + t.assert_equals(errinj.ERRINJ_FAILOVER_DELAY, 'in', + 'Failover have not been stopped yet') + end) + end, {g.replica_1_b:replicaset_uuid(), g.replica_1_b:instance_uuid()}) + + -- Break 'info' request on replica so that it fails with TimedOut error. + g.replica_1_b:exec(function() + rawset(_G, 'old_call', ivshard.storage._call) + ivshard.storage._call = function(service_name, ...) + if service_name == 'info' then + ifiber.sleep(ivconst.CALL_TIMEOUT_MIN * 5) + end + return _G.old_call(service_name, ...) + end + end) + + affect_priority_clear_net_timeout(g) + g.router:exec(function(rs_uuid, master_uuid) + local router = ivshard.router.internal.static_router + local rs = router.replicasets[rs_uuid] + + -- And we change the prioritized replica. + ivshard.router.internal.errinj.ERRINJ_FAILOVER_DELAY = false + t.helpers.retrying({timeout = iwait_timeout}, function() + router.failover_fiber:wakeup() + t.assert_equals(rs.replica.uuid, master_uuid) + end) + + -- Check, that prioritized replica is not changed, as it's still broken. + rawset(_G, 'old_up_timeout', ivconst.FAILOVER_UP_TIMEOUT) + ivconst.FAILOVER_UP_TIMEOUT = 0.01 + ivtest.service_wait_for_new_ok(router.failover_service, + {on_yield = router.failover_fiber:wakeup()}) + t.assert_equals(rs.replica.uuid, master_uuid) + end, {g.replica_1_b:replicaset_uuid(), g.replica_1_a:instance_uuid()}) + + -- Restore 'info' request. + g.replica_1_b:exec(function() + ivshard.storage._call = _G.old_call + _G.old_call = nil + end) + + -- As replica_1_b has higher priority, it should be restored automatically. + g.router:exec(function(rs_uuid, replica_uuid) + local router = ivshard.router.internal.static_router + local rs = router.replicasets[rs_uuid] + t.assert_equals(rs.priority_list[1].uuid, replica_uuid) + t.helpers.retrying({timeout = iwait_timeout}, function() + router.failover_fiber:wakeup() + t.assert_equals(rs.replica.uuid, replica_uuid, + 'Prioritized replica is not up yet') + end) + end, {g.replica_1_b:replicaset_uuid(), g.replica_1_b:instance_uuid()}) + + vtest.router_cfg(g.router, global_cfg) + g.router:exec(function() + ivconst.FAILOVER_UP_TIMEOUT = _G.old_up_timeout + _G.old_up_timeout = nil + end) +end + +-- +-- gh-483: user calls also affects priority. If several sequential requests +-- fail, then the same logic as in the previous test happens. +-- +g.test_failed_calls_affect_priority = function() + prepare_affect_priority_rs(g) + local timeout = vconsts.CALL_TIMEOUT_MIN * 4 + + -- Find prioritized replica and disable failover for now. + g.router:exec(function(rs_uuid, replica_uuid) + local router = ivshard.router.internal.static_router + local rs = router.replicasets[rs_uuid] + local opts = {timeout = iwait_timeout} + rs:wait_connected_all(opts) + + t.helpers.retrying(opts, function() + router.failover_fiber:wakeup() + t.assert_equals(rs.replica.uuid, replica_uuid, + 'Prioritized replica have not been set yet') + end) + + local errinj = ivshard.router.internal.errinj + errinj.ERRINJ_FAILOVER_DELAY = true + t.helpers.retrying(opts, function() + router.failover_fiber:wakeup() + t.assert_equals(errinj.ERRINJ_FAILOVER_DELAY, 'in', + 'Failover have not been stopped yet') + end) + + -- Discovery is disabled, as it may affect `net_sequential_fail` + -- and leads to flakiness of the test. + errinj.ERRINJ_LONG_DISCOVERY = true + t.helpers.retrying(opts, function() + router.discovery_fiber:wakeup() + t.assert_equals(errinj.ERRINJ_LONG_DISCOVERY, 'waiting', + 'Discovery have not been stopped yet') + end) + end, {g.replica_1_b:replicaset_uuid(), g.replica_1_b:instance_uuid()}) + + -- Break 'info' request on replica so that it fails with TimedOut error. + -- No other request can be broken, as only failover changes priority and + -- as soon as it wakes up it succeeds with `_call` and sets + -- `net_sequential_fail` to 0. + g.replica_1_b:exec(function() + rawset(_G, 'old_call', ivshard.storage._call) + ivshard.storage._call = function(service_name, ...) + if service_name == 'info' then + ifiber.sleep(ivconst.CALL_TIMEOUT_MIN * 5) + end + return _G.old_call(service_name, ...) + end + end) + + affect_priority_clear_net_timeout(g) + local bid = vtest.storage_first_bucket(g.replica_1_a) + g.router:exec(function(bid, timeout, rs_uuid, replica_uuid) + local router = ivshard.router.internal.static_router + local replica = router.replicasets[rs_uuid].replica + t.assert_equals(replica.uuid, replica_uuid) + + local fails = replica.net_sequential_fail + for _ = 1, ivconst.FAILOVER_DOWN_SEQUENTIAL_FAIL do + local ok, err = ivshard.router.callro(bid, 'vshard.storage._call', + {'info'}, {timeout = timeout}) + t.assert_not(ok) + t.assert(iverror.is_timeout(err)) + end + + t.assert_equals(replica.net_sequential_fail, + fails + ivconst.FAILOVER_DOWN_SEQUENTIAL_FAIL) + + -- Priority is changed only by failover. So, the prioritized replica + -- is still the failing one. + t.assert_equals(router.replicasets[rs_uuid].replica.uuid, replica_uuid) + end, {bid, timeout, g.replica_1_b:replicaset_uuid(), + g.replica_1_b:instance_uuid()}) + + -- Enable failover, which changes priority of the replica. + g.router:exec(function(rs_uuid, master_uuid) + local router = ivshard.router.internal.static_router + ivshard.router.internal.errinj.ERRINJ_FAILOVER_DELAY = false + t.helpers.retrying({timeout = iwait_timeout}, function() + router.failover_fiber:wakeup() + t.assert_equals(router.replicasets[rs_uuid].replica.uuid, + master_uuid, 'Master is not prioritized yet') + end) + end, {g.replica_1_b:replicaset_uuid(), g.replica_1_a:instance_uuid()}) + + -- Restore 'info' request. + g.replica_1_b:exec(function() + ivshard.storage._call = _G.old_call + _G.old_call = nil + end) + + -- As replica_1_b has higher priority, it should be restored automatically. + g.router:exec(function(rs_uuid, replica_uuid) + local old_up_timeout = ivconst.FAILOVER_UP_TIMEOUT + ivconst.FAILOVER_UP_TIMEOUT = 1 + local router = ivshard.router.internal.static_router + local rs = router.replicasets[rs_uuid] + t.assert_equals(rs.priority_list[1].uuid, replica_uuid) + t.helpers.retrying({timeout = iwait_timeout}, function() + router.failover_fiber:wakeup() + t.assert_equals(rs.replica.uuid, replica_uuid, + 'Prioritized replica is not up yet') + end) + ivconst.FAILOVER_UP_TIMEOUT = old_up_timeout + end, {g.replica_1_b:replicaset_uuid(), g.replica_1_b:instance_uuid()}) + + vtest.router_cfg(g.router, global_cfg) +end diff --git a/test/router/exponential_timeout.result b/test/router/exponential_timeout.result index 252b816c..bd62b739 100644 --- a/test/router/exponential_timeout.result +++ b/test/router/exponential_timeout.result @@ -28,9 +28,10 @@ util.map_evals(test_run, {REPLICASET_1, REPLICASET_2}, 'bootstrap_storage(\'memt _ = test_run:cmd("create server router_1 with script='router/router_1.lua'") --- ... --- Discovery algorithm changes sometimes and should not affect the +-- Discovery algorithm and failover changes sometimes and should not affect the -- exponential timeout test. -_ = test_run:cmd("start server router_1 with args='discovery_disable'") +_ = test_run:cmd("start server router_1 with " .. \ + "args='discovery_disable failover_disable'") --- ... _ = test_run:switch('router_1') @@ -103,7 +104,7 @@ util.collect_timeouts(rs1) - - fail: 0 ok: 0 timeout: 0.5 - - fail: 1 + - fail: 2 ok: 0 timeout: 1 ... diff --git a/test/router/exponential_timeout.test.lua b/test/router/exponential_timeout.test.lua index 881b9a7e..ab677d60 100644 --- a/test/router/exponential_timeout.test.lua +++ b/test/router/exponential_timeout.test.lua @@ -10,9 +10,10 @@ util.wait_master(test_run, REPLICASET_1, 'storage_1_a') util.wait_master(test_run, REPLICASET_2, 'storage_2_a') util.map_evals(test_run, {REPLICASET_1, REPLICASET_2}, 'bootstrap_storage(\'memtx\')') _ = test_run:cmd("create server router_1 with script='router/router_1.lua'") --- Discovery algorithm changes sometimes and should not affect the +-- Discovery algorithm and failover changes sometimes and should not affect the -- exponential timeout test. -_ = test_run:cmd("start server router_1 with args='discovery_disable'") +_ = test_run:cmd("start server router_1 with " .. \ + "args='discovery_disable failover_disable'") _ = test_run:switch('router_1') util = require('util') diff --git a/test/router/retry_reads.result b/test/router/retry_reads.result index 80834750..e057422f 100644 --- a/test/router/retry_reads.result +++ b/test/router/retry_reads.result @@ -28,9 +28,10 @@ util.map_evals(test_run, {REPLICASET_1, REPLICASET_2}, 'bootstrap_storage(\'memt _ = test_run:cmd("create server router_1 with script='router/router_1.lua'") --- ... --- Discovery algorithm changes sometimes and should not affect the --- read retry decisions. -_ = test_run:cmd("start server router_1 with args='discovery_disable'") +-- Discovery algorithm and failover changes sometimes and should not affect the +-- exponential timeout test. +_ = test_run:cmd("start server router_1 with " .. \ + "args='discovery_disable failover_disable'") --- ... _ = test_run:switch('router_1') @@ -69,7 +70,7 @@ util.collect_timeouts(rs1) - - fail: 0 ok: 0 timeout: 0.5 - - fail: 1 + - fail: 2 ok: 0 timeout: 1 ... diff --git a/test/router/retry_reads.test.lua b/test/router/retry_reads.test.lua index f0a44457..a9554d77 100644 --- a/test/router/retry_reads.test.lua +++ b/test/router/retry_reads.test.lua @@ -10,9 +10,10 @@ util.wait_master(test_run, REPLICASET_1, 'storage_1_a') util.wait_master(test_run, REPLICASET_2, 'storage_2_a') util.map_evals(test_run, {REPLICASET_1, REPLICASET_2}, 'bootstrap_storage(\'memtx\')') _ = test_run:cmd("create server router_1 with script='router/router_1.lua'") --- Discovery algorithm changes sometimes and should not affect the --- read retry decisions. -_ = test_run:cmd("start server router_1 with args='discovery_disable'") +-- Discovery algorithm and failover changes sometimes and should not affect the +-- exponential timeout test. +_ = test_run:cmd("start server router_1 with " .. \ + "args='discovery_disable failover_disable'") _ = test_run:switch('router_1') util = require('util') diff --git a/test/router/router.result b/test/router/router.result index 4389367f..8f678564 100644 --- a/test/router/router.result +++ b/test/router/router.result @@ -1527,7 +1527,8 @@ table.sort(error_messages) ... error_messages --- -- - Use replica:check_is_connected(...) instead of replica.check_is_connected(...) +- - Use replica:call(...) instead of replica.call(...) + - Use replica:check_is_connected(...) instead of replica.check_is_connected(...) - Use replica:detach_conn(...) instead of replica.detach_conn(...) - Use replica:is_connected(...) instead of replica.is_connected(...) - Use replica:safe_uri(...) instead of replica.safe_uri(...) diff --git a/test/router/router_1.lua b/test/router/router_1.lua deleted file mode 120000 index da63b08a..00000000 --- a/test/router/router_1.lua +++ /dev/null @@ -1 +0,0 @@ -../../example/router_1.lua \ No newline at end of file diff --git a/test/router/router_1.lua b/test/router/router_1.lua new file mode 100755 index 00000000..887c43bc --- /dev/null +++ b/test/router/router_1.lua @@ -0,0 +1,35 @@ +#!/usr/bin/env tarantool + +require('strict').on() +fiber = require('fiber') + +-- Check if we are running under test-run +if os.getenv('ADMIN') then + test_run = require('test_run').new() + require('console').listen(os.getenv('ADMIN')) +end + +replicasets = {'cbf06940-0790-498b-948d-042b62cf3d29', + 'ac522f65-aa94-4134-9f64-51ee384f1a54'} + +-- Call a configuration provider +cfg = dofile('localcfg.lua') +if arg[1] == 'discovery_disable' then + cfg.discovery_mode = 'off' +end + +-- Start the database with sharding +vshard = require('vshard') + +if arg[2] == 'failover_disable' then + vshard.router.internal.errinj.ERRINJ_FAILOVER_DELAY = true +end + +vshard.router.cfg(cfg) + +if arg[2] == 'failover_disable' then + while vshard.router.internal.errinj.ERRINJ_FAILOVER_DELAY ~= 'in' do + router.failover_fiber:wakeup() + fiber.sleep(0.01) + end +end diff --git a/vshard/consts.lua b/vshard/consts.lua index 249d4915..f8bedc4b 100644 --- a/vshard/consts.lua +++ b/vshard/consts.lua @@ -49,6 +49,7 @@ return { CALL_TIMEOUT_MAX = 64; FAILOVER_UP_TIMEOUT = 5; FAILOVER_DOWN_TIMEOUT = 1; + FAILOVER_DOWN_SEQUENTIAL_FAIL = 3; DEFAULT_FAILOVER_PING_TIMEOUT = 5; DEFAULT_SYNC_TIMEOUT = 1; RECONNECT_TIMEOUT = 0.5; diff --git a/vshard/replicaset.lua b/vshard/replicaset.lua index d38ef49a..abdffe67 100644 --- a/vshard/replicaset.lua +++ b/vshard/replicaset.lua @@ -483,8 +483,10 @@ end -- local function replicaset_down_replica_priority(replicaset) local old_replica = replicaset.replica - assert(old_replica and old_replica.down_ts and - not old_replica:is_connected()) + assert(old_replica and ((old_replica.down_ts and + not old_replica:is_connected()) or + old_replica.net_sequential_fail >= + consts.FAILOVER_DOWN_SEQUENTIAL_FAIL)) local new_replica = old_replica.next_by_priority if new_replica then assert(new_replica ~= old_replica) @@ -511,7 +513,8 @@ local function replicaset_up_replica_priority(replicaset) -- Failed to up priority. return end - if replica:is_connected() then + if replica:is_connected() and replica.net_sequential_ok > 0 then + assert(replica.net_sequential_fail == 0) replicaset.replica = replica assert(not old_replica or old_replica.weight >= replicaset.replica.weight) @@ -527,15 +530,12 @@ end -- local function replica_on_failed_request(replica) replica.net_sequential_ok = 0 - local val = replica.net_sequential_fail + 1 - if val >= 2 then + replica.net_sequential_fail = replica.net_sequential_fail + 1 + if replica.net_sequential_fail >= 2 then local new_timeout = replica.net_timeout * 2 if new_timeout <= consts.CALL_TIMEOUT_MAX then replica.net_timeout = new_timeout end - replica.net_sequential_fail = 1 - else - replica.net_sequential_fail = val end end @@ -1268,6 +1268,7 @@ local replica_mt = { return util.uri_format(uri) end, detach_conn = replica_detach_conn, + call = replica_call, }, __tostring = function(replica) if replica.name then diff --git a/vshard/router/init.lua b/vshard/router/init.lua index 6ac80b79..cbd978ef 100644 --- a/vshard/router/init.lua +++ b/vshard/router/init.lua @@ -44,6 +44,7 @@ if not M then ERRINJ_CFG = false, ERRINJ_CFG_DELAY = false, ERRINJ_FAILOVER_CHANGE_CFG = false, + ERRINJ_FAILOVER_DELAY = false, ERRINJ_RELOAD = false, ERRINJ_LONG_DISCOVERY = false, ERRINJ_MASTER_SEARCH_DELAY = false, @@ -973,41 +974,26 @@ end -- Failover -------------------------------------------------------------------------------- -local function failover_ping_round(router) - for _, replicaset in pairs(router.replicasets) do - local replica = replicaset.replica - if replica ~= nil and replica.conn ~= nil and - replica.down_ts == nil then - if not replica.conn:ping({timeout = - router.failover_ping_timeout}) then - log.info('Ping error from %s: perhaps a connection is down', - replica) - -- Connection hangs. Recreate it to be able to - -- fail over to a replica next by priority. The - -- old connection is not closed in case if it just - -- processes too big response at this moment. Any - -- way it will be eventually garbage collected - -- and closed. - replica:detach_conn() - replicaset:connect_replica(replica) - end - end - end -end - -- -- Replicaset must fall its replica connection to lower priority, -- if the current one is down too long. -- local function failover_need_down_priority(replicaset, curr_ts) local r = replicaset.replica + if not r or not r.next_by_priority then + return false + end -- down_ts not nil does not mean that the replica is not -- connected. Probably it is connected and now fetches schema, -- or does authorization. Either case, it is healthy, no need -- to down the prio. - return r and r.down_ts and not r:is_connected() and - curr_ts - r.down_ts >= consts.FAILOVER_DOWN_TIMEOUT - and r.next_by_priority + local is_down_ts = r.down_ts and not r:is_connected() and + curr_ts - r.down_ts >= consts.FAILOVER_DOWN_TIMEOUT + -- If we failed several sequential requests to replica, then something + -- is wrong with it. Temporary lower its priority. + local is_sequential_fails = + r.net_sequential_fail >= consts.FAILOVER_DOWN_SEQUENTIAL_FAIL + return is_down_ts or is_sequential_fails end -- @@ -1035,6 +1021,49 @@ local function failover_collect_to_update(router) return id_to_update end +local function failover_ping(replica, opts) + return replica:call('vshard.storage._call', {'info'}, opts) +end + +local function failover_ping_round(router, curr_ts) + local opts = {timeout = router.failover_ping_timeout} + for _, replicaset in pairs(router.replicasets) do + local replica = replicaset.replica + if failover_need_up_priority(replicaset, curr_ts) then + -- When its time to increase priority in replicaset, all instances, + -- priority of which are higher than the current one,are pinged so + -- that we know, which connections are working properly. + for _, r in pairs(replicaset.priority_list) do + if r == replica then + break + end + if r.conn ~= nil and r.down_ts == nil then + -- We don't need return values, r.net_sequential_ok is + -- incremented on succcessful request. + failover_ping(r, opts) + end + end + end + if replica ~= nil and replica.conn ~= nil and + replica.down_ts == nil then + local net_status, _, err = failover_ping(replica, opts) + if not net_status then + log.info('Ping error from %s: perhaps a connection is down: %s', + replica, err) + -- Connection hangs. Recreate it to be able to + -- fail over to a replica next by priority. The + -- old connection is not closed in case if it just + -- processes too big response at this moment. Any + -- way it will be eventually garbage collected + -- and closed. + replica:detach_conn() + replicaset:connect_replica(replica) + end + end + end +end + + -- -- Detect not optimal or disconnected replicas. For not optimal -- try to update them to optimal, and down priority of @@ -1042,12 +1071,12 @@ end -- @retval true A replica of an replicaset has been changed. -- local function failover_step(router) - failover_ping_round(router) + local curr_ts = fiber_clock() + failover_ping_round(router, curr_ts) local id_to_update = failover_collect_to_update(router) if #id_to_update == 0 then return false end - local curr_ts = fiber_clock() local replica_is_changed = false for _, id in pairs(id_to_update) do local rs = router.replicasets[id] @@ -1105,6 +1134,12 @@ local function failover_service_f(router, service) -- each min_timeout seconds. local prev_was_ok = false while module_version == M.module_version do + if M.errinj.ERRINJ_FAILOVER_DELAY then + M.errinj.ERRINJ_FAILOVER_DELAY = 'in' + repeat + lfiber.sleep(0.001) + until not M.errinj.ERRINJ_FAILOVER_DELAY + end service:next_iter() service:set_activity('updating replicas') local ok, replica_is_changed = pcall(failover_step, router)