Skip to content

Commit

Permalink
storage: introduce no-activity timeout for conns
Browse files Browse the repository at this point in the history
Automatic master discovery on the storages means, that the
instances are going to establish more connections between
replicasets to locate the masters of each other.

That might easily lead to fullmesh of the whole cluster - it means
lots of connections majority of which are not even used most of
the time. Need to get rid of them somehow if they are not used for
a while.

This commit makes the storages drop the too long unused
connections. Also the automatically discovered master locations
are erased by the same no-activity timeout. The logic behind it is
that if an instance wasn't contacted anyhow for a very long time,
then it might be not a master anymore. It is better not to treat
it as a master then and let it be re-discovered when a master
connection is needed again.

Follow-up #429
Follow-up #430

NO_DOC=internal
  • Loading branch information
Gerold103 committed Nov 15, 2023
1 parent 1cd2873 commit a0d7ddc
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 18 deletions.
58 changes: 58 additions & 0 deletions test/storage-luatest/auto_master_2_2_2_test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -229,3 +229,61 @@ test_group.test_recovery = function(g)
_G.bucket_gc_wait()
end)
end

test_group.test_noactivity_timeout_for_auto_master = function(g)
local bid1, bid2 = g.replica_1_a:exec(function(uuid)
local bid1 = _G.get_first_bucket()
--
-- Use the master connection for anything.
--
local ok, err = ivshard.storage.bucket_send(bid1, uuid,
{timeout = iwait_timeout})
ilt.assert_equals(err, nil)
ilt.assert(ok)
ilt.assert_not_equals(
ivshard.storage.internal.replicasets[uuid].master, nil)
--
-- Wait for the noactivity timeout to expire. The connection must be
-- dropped. The master role is reset, because it is automatic and wasn't
-- validated for too long time. Might be outdated already. Better let it
-- be re-discovered again when needed.
--
local old_timeout = ivconst.REPLICA_NOACTIVITY_TIMEOUT
ivconst.REPLICA_NOACTIVITY_TIMEOUT = 0.01
ifiber.sleep(ivconst.REPLICA_NOACTIVITY_TIMEOUT)
ivtest.service_wait_for_new_ok(
ivshard.storage.internal.conn_manager_service,
{on_yield = function()
ivshard.storage.internal.conn_manager_fiber:wakeup()
end})
ilt.assert_equals(
ivshard.storage.internal.replicasets[uuid].master, nil)
ivconst.REPLICA_NOACTIVITY_TIMEOUT = old_timeout
--
-- Re-discover the connection and the master role.
--
local bid2 = _G.get_first_bucket()
ok, err = ivshard.storage.bucket_send(bid2, uuid,
{timeout = iwait_timeout})
ilt.assert_equals(err, nil)
ilt.assert(ok)
ilt.assert_not_equals(
ivshard.storage.internal.replicasets[uuid].master, nil)
_G.bucket_gc_wait()
return bid1, bid2
end, {g.replica_2_a:replicaset_uuid()})
--
-- Cleanup.
--
g.replica_2_a:exec(function(uuid, bid1, bid2)
local ok, err = ivshard.storage.bucket_send(bid1, uuid,
{timeout = iwait_timeout})
ilt.assert_equals(err, nil)
ilt.assert(ok)
ok, err = ivshard.storage.bucket_send(bid2, uuid,
{timeout = iwait_timeout})
ilt.assert_equals(err, nil)
ilt.assert(ok)
_G.bucket_gc_wait()
end, {g.replica_1_a:replicaset_uuid(), bid1, bid2})
end
36 changes: 36 additions & 0 deletions test/storage-luatest/storage_1_1_test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -494,3 +494,39 @@ test_group.test_master_exclusive_api = function(g)

vtest.cluster_cfg(g, global_cfg)
end

test_group.test_noactivity_timeout_for_explicit_master = function(g)
local bid = g.replica_1_a:exec(function(uuid)
local bid = _G.get_first_bucket()
local ok, err = ivshard.storage.bucket_send(bid, uuid,
{timeout = iwait_timeout})
ilt.assert_equals(err, nil)
ilt.assert(ok)
local master = ivshard.storage.internal.replicasets[uuid].master
ilt.assert_not_equals(master, nil)
ilt.assert_not_equals(master.conn, nil)

local old_timeout = ivconst.REPLICA_NOACTIVITY_TIMEOUT
ivconst.REPLICA_NOACTIVITY_TIMEOUT = 0.01
ifiber.sleep(ivconst.REPLICA_NOACTIVITY_TIMEOUT)
ivtest.service_wait_for_new_ok(
ivshard.storage.internal.conn_manager_service,
{on_yield = function()
ivshard.storage.internal.conn_manager_fiber:wakeup()
end})
master = ivshard.storage.internal.replicasets[uuid].master
ilt.assert_not_equals(master, nil)
ilt.assert_equals(master.conn, nil)
ivconst.REPLICA_NOACTIVITY_TIMEOUT = old_timeout
_G.bucket_gc_wait()
return bid
end, {g.replica_2_a:replicaset_uuid()})

g.replica_2_a:exec(function(uuid, bid)
local ok, err = ivshard.storage.bucket_send(bid, uuid,
{timeout = iwait_timeout})
ilt.assert_equals(err, nil)
ilt.assert(ok)
_G.bucket_gc_wait()
end, {g.replica_1_a:replicaset_uuid(), bid})
end
1 change: 1 addition & 0 deletions vshard/consts.lua
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ return {
RECOVERY_BACKOFF_INTERVAL = 5,
RECOVERY_GET_STAT_TIMEOUT = 5,
REPLICA_BACKOFF_INTERVAL = 5,
REPLICA_NOACTIVITY_TIMEOUT = 60 * 5,
DEFAULT_BUCKET_SEND_TIMEOUT = 10,
DEFAULT_BUCKET_RECV_TIMEOUT = 10,

Expand Down
3 changes: 3 additions & 0 deletions vshard/replicaset.lua
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
-- down_ts = <timestamp of disconnect from the
-- replica>,
-- backoff_ts = <timestamp when was sent into backoff state>,
-- activity_ts = <timestamp when the replica was used last time>,
-- backoff_err = <error object caused the backoff>,
-- net_timeout = <current network timeout for calls,
-- doubled on each network fail until
Expand Down Expand Up @@ -208,6 +209,7 @@ end
-- until a connection is established.
--
local function replicaset_connect_to_replica(replicaset, replica)
replica.activity_ts = fiber_clock()
local conn = replica.conn
if not conn or netbox_is_conn_dead(conn) then
conn = netbox.connect(replica.uri, {
Expand Down Expand Up @@ -413,6 +415,7 @@ end
--
local function replica_call(replica, func, args, opts)
assert(opts and opts.timeout)
replica.activity_ts = fiber_clock()
local conn = replica.conn
local net_status, storage_status, retval, error_object =
pcall(conn.call, conn, func, args, opts)
Expand Down
43 changes: 25 additions & 18 deletions vshard/storage/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3595,10 +3595,34 @@ local function conn_manager_locate_masters(service)
return is_all_done
end

local function conn_manager_collect_idle_conns()
local ts = fiber_clock()
local count = 0
for _, rs in pairs(M.replicasets) do
for _, replica in pairs(rs.replicas) do
local c = replica.conn
if c and replica.activity_ts and
replica.activity_ts + consts.REPLICA_NOACTIVITY_TIMEOUT < ts then
if replica == rs.master and rs.is_master_auto then
assert(rs ~= M.this_replicaset)
rs.master = nil
end
replica.conn = nil
c:close()
count = count + 1
end
end
end
if count > 0 then
log.info('Closed %s unused connections', count)
end
end

local function conn_manager_service_f(service)
local module_version = M.module_version
while module_version == M.module_version do
service:next_iter()
conn_manager_collect_idle_conns()
local timeout
service:set_activity('master discovery')
lfiber.testcancel()
Expand All @@ -3624,30 +3648,13 @@ local function conn_manager_f()
M.conn_manager_service = nil
end

local function conn_manager_service_start()
local function conn_manager_update()
if not M.conn_manager_fiber then
M.conn_manager_fiber = util.reloadable_fiber_create(
'vshard.conn_man', M, 'conn_manager_f')
end
end

local function conn_manager_service_stop()
if M.conn_manager_fiber then
M.conn_manager_fiber:cancel()
M.conn_manager_fiber = nil
end
end

local function conn_manager_update()
for _, rs in pairs(M.replicasets) do
if rs.is_master_auto then
conn_manager_service_start()
return
end
end
conn_manager_service_stop()
end

--------------------------------------------------------------------------------
-- Configuration
--------------------------------------------------------------------------------
Expand Down

0 comments on commit a0d7ddc

Please sign in to comment.