diff --git a/test/replicaset-luatest/replicaset_3_test.lua b/test/replicaset-luatest/replicaset_3_test.lua index 19b4776f..8016bd96 100644 --- a/test/replicaset-luatest/replicaset_3_test.lua +++ b/test/replicaset-luatest/replicaset_3_test.lua @@ -3,6 +3,7 @@ local t = require('luatest') local vreplicaset = require('vshard.replicaset') local vtest = require('test.luatest_helpers.vtest') local verror = require('vshard.error') +local vutil = require('vshard.util') local small_timeout_opts = {timeout = 0.05} local timeout_opts = {timeout = vtest.wait_timeout} @@ -247,6 +248,7 @@ test_group.test_locate_master_when_no_conn_object = function(g) end test_group.test_named_replicaset = function(g) + t.run_only_if(vutil.feature.persistent_names) local new_cfg_template = table.deepcopy(cfg_template) new_cfg_template.identification_mode = 'name_as_key' new_cfg_template.sharding['replicaset'] = new_cfg_template.sharding[1] @@ -267,9 +269,15 @@ test_group.test_named_replicaset = function(g) t.assert_equals(rs.id, rs.name) t.assert_equals(replica_1_a.id, replica_1_a.name) - -- Just to be sure, that it works. + -- Name is not set, name mismatch error. + local ret, err = rs:callrw('get_uuid', {}, {timeout = 5}) + t.assert_equals(err.name, 'INSTANCE_NAME_MISMATCH') + t.assert_equals(ret, nil) + + -- Set name, everything works from now on. + g.replica_1_a:exec(function() box.cfg{instance_name = 'replica_1_a'} end) local uuid_a = g.replica_1_a:instance_uuid() - local ret, err = rs:callrw('get_uuid', {}, timeout_opts) + ret, err = rs:callrw('get_uuid', {}, timeout_opts) t.assert_equals(err, nil) t.assert_equals(ret, uuid_a) diff --git a/test/replicaset-luatest/vconnect_test.lua b/test/replicaset-luatest/vconnect_test.lua new file mode 100644 index 00000000..b00ce872 --- /dev/null +++ b/test/replicaset-luatest/vconnect_test.lua @@ -0,0 +1,189 @@ +local fiber = require('fiber') +local t = require('luatest') +local vreplicaset = require('vshard.replicaset') +local vtest = require('test.luatest_helpers.vtest') +local vutil = require('vshard.util') +local verror = require('vshard.error') + +local small_timeout_opts = {timeout = 0.01} +local timeout_opts = {timeout = vtest.wait_timeout} + +local test_group = t.group('vconnect') + +local cfg_template = { + sharding = { + replicaset = { + replicas = { + replica = { + master = true, + }, + }, + }, + }, + bucket_count = 20, + test_user_grant_range = 'super', + replication_timeout = 0.1, + identification_mode = 'name_as_key', +} +local global_cfg + +test_group.before_all(function(g) + t.run_only_if(vutil.feature.persistent_names) + global_cfg = vtest.config_new(cfg_template) + vtest.cluster_new(g, global_cfg) + vtest.cluster_bootstrap(g, global_cfg) + vtest.cluster_wait_vclock_all(g) +end) + +test_group.after_all(function(g) + g.cluster:stop() +end) + +-- +-- Test, that conn_vconnect_wait fails to get correct +-- result. Connection should be closed. +-- +test_group.test_vconnect_no_result = function(g) + local _, rs = next(vreplicaset.buildall(global_cfg)) + g.replica:exec(function() + rawset(_G, '_call', ivshard.storage._call) + ivshard.storage._call = nil + end) + + -- Drop connection in order to make replicaset to recreate it. + rs.master.conn = nil + local ret, err = rs:callrw('get_uuid', {}, timeout_opts) + t.assert_str_contains(err.message, "_call' is not defined") + t.assert_equals(ret, nil) + -- Critical error, connection should be closed. + t.assert_equals(rs.master.conn.state, 'closed') + + g.replica:exec(function() + ivshard.storage._call = _G._call + end) +end + +-- +-- Test, that conn_vconnect_wait fails, when future is nil. +-- +test_group.test_vconnect_no_future = function(g) + local _, rs = next(vreplicaset.buildall(global_cfg)) + g.replica:exec(function() + rawset(_G, '_call', ivshard.storage._call) + rawset(_G, 'do_sleep', true) + -- Future should not appear at all. + ivshard.storage._call = function() + while _G.do_sleep do + ifiber.sleep(0.1) + end + end + end) + + rs.master.conn = nil + local ret, err = rs:callrw('get_uuid', {}, small_timeout_opts) + t.assert(verror.is_timeout(err)) + t.assert_equals(ret, nil) + t.assert_not_equals(rs.master.conn.state, 'closed') + + g.replica:exec(function() + _G.do_sleep = false + ivshard.storage._call = _G._call + end) +end + +-- +-- Test, that conn_vconnect_check fails, when future's result is nil. +-- +test_group.test_vconnect_check_no_future = function(g) + local _, rs = next(vreplicaset.buildall(global_cfg)) + g.replica:exec(function() + rawset(_G, '_call', ivshard.storage._call) + ivshard.storage._call = nil + end) + + rs.master.conn = nil + local opts = table.deepcopy(timeout_opts) + opts.is_async = true + t.helpers.retrying({}, function() + -- It may be VHANDSHAKE_NOT_COMPLETE error, when future + -- is not ready. But at the end it must be the actual error. + local ret, err = rs:callrw('get_uuid', {}, opts) + t.assert_str_contains(err.message, "_call' is not defined") + t.assert_equals(ret, nil) + t.assert_equals(rs.master.conn.state, 'closed') + end) + + g.replica:exec(function() + ivshard.storage._call = _G._call + end) +end + +-- +-- 1. Change name and stop replica. +-- 2. Wait for error_reconnect timeout. +-- 3. Assert, that on reconnect name change is noticed. +-- +test_group.test_vconnect_on_reconnect = function(g) + local _, rs = next(vreplicaset.buildall(global_cfg)) + t.assert_not_equals(rs:connect_master(), nil) + -- Configuration to use after restart. + local new_cfg = table.deepcopy(global_cfg) + local cfg_rs = new_cfg.sharding.replicaset + cfg_rs.replicas.bad = cfg_rs.replicas.replica + cfg_rs.replicas.replica = nil + + g.replica:exec(function() + box.cfg{instance_name = 'bad', force_recovery = true} + end) + g.replica:stop() + t.helpers.retrying({}, function() + t.assert_equals(rs.master.conn.state, 'error_reconnect') + end) + + -- Replica cannot be started with incorrect name, change box.cfg. + g.replica.box_cfg.instance_name = 'bad' + g.replica:start() + vtest.cluster_cfg(g, new_cfg) + local ret, err = rs:callrw('get_uuid', {}, timeout_opts) + t.assert_equals(err.name, 'INSTANCE_NAME_MISMATCH') + t.assert_equals(ret, nil) + t.assert_equals(rs.master.conn.state, 'closed') + + g.replica:exec(function() + box.cfg{instance_name = 'replica', force_recovery = true} + end) + vtest.cluster_cfg(g, global_cfg) +end + +-- +-- Test, that async call doesn't yield and immediately fails. +-- +test_group.test_async_no_yield = function(g) + local _, rs = next(vreplicaset.buildall(global_cfg)) + g.replica:exec(function() + rawset(_G, '_call', ivshard.storage._call) + rawset(_G, 'do_sleep', true) + -- Future should not appear at all. + ivshard.storage._call = function() + while _G.do_sleep do + ifiber.sleep(0.1) + end + end + end) + + local opts = table.deepcopy(timeout_opts) + opts.is_async = true + local csw1 = fiber.self():csw() + local ret, err = rs:callrw('get_uuid', {}, opts) + local csw2 = fiber.self():csw() + -- Waiting for #456 to be fixed. + t.assert_equals(csw2, csw1 + 1) + t.assert_str_contains(err.name, 'VHANDSHAKE_NOT_COMPLETE') + t.assert_equals(ret, nil) + t.assert_not_equals(rs.master.conn.state, 'closed') + + g.replica:exec(function() + _G.do_sleep = false + ivshard.storage._call = _G._call + end) +end diff --git a/test/storage/storage.result b/test/storage/storage.result index 37c6ac78..8cd1cdf4 100644 --- a/test/storage/storage.result +++ b/test/storage/storage.result @@ -1014,16 +1014,16 @@ vshard.storage.internal.errinj.ERRINJ_RECOVERY_PAUSE = false -- -- Internal info function. -- -vshard.storage._call('info') +vshard.storage._call('info').is_master --- -- is_master: true +- true ... _ = test_run:switch('storage_1_b') --- ... -vshard.storage._call('info') +vshard.storage._call('info').is_master --- -- is_master: false +- false ... -- -- gh-123, gh-298: storage auto-enable/disable depending on instance state. diff --git a/test/storage/storage.test.lua b/test/storage/storage.test.lua index 8fd8cd26..6c3b5515 100644 --- a/test/storage/storage.test.lua +++ b/test/storage/storage.test.lua @@ -327,9 +327,9 @@ vshard.storage.internal.errinj.ERRINJ_RECOVERY_PAUSE = false -- -- Internal info function. -- -vshard.storage._call('info') +vshard.storage._call('info').is_master _ = test_run:switch('storage_1_b') -vshard.storage._call('info') +vshard.storage._call('info').is_master -- -- gh-123, gh-298: storage auto-enable/disable depending on instance state. diff --git a/vshard/error.lua b/vshard/error.lua index 3ab0832e..1d78d7e2 100644 --- a/vshard/error.lua +++ b/vshard/error.lua @@ -202,6 +202,16 @@ local error_message_template = { msg = 'Bucket %s update is invalid: %s', args = {'bucket_id', 'reason'}, }, + [40] = { + name = 'VHANDSHAKE_NOT_COMPLETE', + msg = 'Handshake with %s have not been completed yet', + args = {'replica'}, + }, + [41] = { + name = 'INSTANCE_NAME_MISMATCH', + msg = 'Mismatch server name: expected "%s", but got "%s"', + args = {'expected_name', 'actual_name'}, + }, } -- diff --git a/vshard/replicaset.lua b/vshard/replicaset.lua index b1e2defe..e213fc80 100644 --- a/vshard/replicaset.lua +++ b/vshard/replicaset.lua @@ -75,6 +75,144 @@ local fiber_cond_wait = util.fiber_cond_wait local future_wait = util.future_wait local gsc = util.generate_self_checker +-- +-- vconnect is asynchronous vshard greeting, saved inside netbox connection. +-- It stores future object and additional info, needed for its work. +-- Future is initialized, when the connection is established (inside +-- netbox_on_connect). The connection cannot be considered "connected" +-- until vconnect is properly validated. +-- +local function conn_vconnect_set(conn) + assert(conn.replica ~= nil) + local is_named = conn.replica.id == conn.replica.name + if not is_named then + -- Nothing to do. Check is not needed. + return + end + -- Update existing vconnect. Fiber condition cannot be dropped, + -- somebody may already waiting on it. + if conn.vconnect then + -- Connections are preserved during reconfiguration, + -- identification may be changed. + conn.vconnect.is_named = is_named + if not conn.vconnect.future then + return + end + -- Future object must be updated. Old result is irrelevant. + conn.vconnect.future:discard() + conn.vconnect.future = nil + return + end + -- Create new vconnect. + conn.vconnect = { + -- Whether the connection is done to the named replica. + is_named = is_named, + -- Used to wait for the appearance of vconnect.future object, + -- if call is done before the connection is established. + future_cond = fiber.cond(), + } +end + +-- +-- Initialize future object. Should be done, when connection is established +-- (inside netbox_on_connect). +-- +local function conn_vconnect_start(conn) + local vconn = conn.vconnect + if not vconn then + -- Nothing to do. Check is not needed. + return + end + + local opts = {is_async = true} + vconn.future = conn:call('vshard.storage._call', {'info'}, opts) + vconn.future_cond:broadcast() +end + +-- +-- Check, that future is ready, and its result is expected. +-- The function doesn't yield. +-- @retval true; The correct response is received. +-- @retval nil, ...; Response is not received or validation error happened. +-- +local function conn_vconnect_check(conn) + local vconn = conn.vconnect + -- conn.vconnect may be nil, if connection was created on old version + -- and the storage was reloaded to a new one. It's also nil, when + -- all checks were already done. + if not vconn then + return true + end + -- Nothing to do, but wait in such case. + if not vconn.future or not vconn.future:is_ready() then + return nil, lerror.vshard(lerror.code.VHANDSHAKE_NOT_COMPLETE, + conn.replica.id) + end + -- Critical errors. Connection should be closed after these ones. + local result, err = vconn.future:result() + if not result then + -- Failed to get response. E.g. access error. + return nil, lerror.make(err) + end + if vconn.is_named and result[1].name ~= conn.replica.name then + return nil, lerror.vshard(lerror.code.INSTANCE_NAME_MISMATCH, + conn.replica.name, result[1].name) + end + -- Don't validate until reconnect happens. + conn.vconnect = nil + return true +end + +local function conn_vconnect_check_or_close(conn) + local ok, err = conn_vconnect_check(conn) + -- Close the connection, if error happened, but it is not + -- VSHANDSHAKE_NOT_COMPLETE. + if not ok and err and not (err.type == 'ShardingError' and + err.code == lerror.code.VHANDSHAKE_NOT_COMPLETE) then + conn:close() + end + return ok, err +end + +-- +-- Wait until the future object is ready. Returns remaining timeout. +-- @retval timeout; Future boject is ready. +-- @retval nil, err; Timeout passed. +-- +local function conn_vconnect_wait(conn, timeout) + local vconn = conn.vconnect + -- Fast path. In most cases no validation should be done at all. + -- conn.vconnect may be nil, if connection was created on old version + -- and the storage was reloaded to a new one. It's also nil, when + -- all checks were already done. + if not vconn or (vconn.future and vconn.future:is_ready()) then + return timeout + end + local deadline = fiber_clock() + timeout + -- Wait for connection to be established. + if not vconn.future and + not fiber_cond_wait(vconn.future_cond, timeout) then + return nil, lerror.timeout() + end + timeout = deadline - fiber_clock() + -- Wait for async call to return. + local res, err = future_wait(vconn.future, timeout) + if res == nil then + -- Either timeout error or any other. If it's not a timeout error, + -- then conn must be recteated, handshake must be retried. + return nil, lerror.make(err) + end + return deadline - fiber_clock() +end + +local function conn_vconnect_wait_or_close(conn, timeout) + local ok, err = conn_vconnect_wait(conn, timeout) + if not ok and not lerror.is_timeout(err) then + conn:close() + end + return ok, err +end + -- -- on_connect() trigger for net.box -- @@ -86,6 +224,11 @@ local function netbox_on_connect(conn) -- If a replica's connection has revived, then unset -- replica.down_ts - it is not down anymore. replica.down_ts = nil + -- conn.vconnect is set on every connect, as it may be nil, + -- if previous check successfully passed. Also, the needed + -- checks may change on reconnect. + conn_vconnect_set(conn) + conn_vconnect_start(conn) if replica.uuid and conn.peer_uuid ~= replica.uuid and -- XXX: Zero UUID means not a real Tarantool instance. It -- is likely to be a cartridge.remote-control server, @@ -222,6 +365,11 @@ local function replicaset_connect_to_replica(replicaset, replica) }) conn.replica = replica conn.replicaset = replicaset + -- vconnect must be set before the time, connection is established, + -- as we must know, that the connection cannot be used. If vconnect + -- is nil, it means all checks passed, so we may make a call and + -- only after that 'require' checks. + conn_vconnect_set(conn) conn.on_connect_ref = netbox_on_connect conn:on_connect(netbox_on_connect) conn.on_disconnect_ref = netbox_on_disconnect @@ -421,6 +569,21 @@ local function replica_call(replica, func, args, opts) assert(opts and opts.timeout) replica.activity_ts = fiber_clock() local conn = replica.conn + if not opts.is_async then + -- Async call cannot yield. So, we cannot wait for the connection + -- to be established and validate vconnect. Immediately fail below, + -- in conn_vconnect_check_or_close, if something is wrong. + local timeout, err = conn_vconnect_wait_or_close(conn, opts.timeout) + if not timeout then + return false, nil, lerror.make(err) + end + opts.timeout = timeout + end + local ok, err = conn_vconnect_check_or_close(conn) + if not ok then + return false, nil, lerror.make(err) + end + assert(conn.vconnect == nil) local net_status, storage_status, retval, error_object = pcall(conn.call, conn, func, args, opts) if not net_status then @@ -1031,7 +1194,8 @@ replicaset_mt.__index = index local replica_mt = { __index = { is_connected = function(replica) - return replica.conn and replica.conn:is_connected() + return replica.conn and replica.conn:is_connected() and + conn_vconnect_check_or_close(replica.conn) end, safe_uri = function(replica) local uri = luri.parse(replica.uri) diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua index 6603189e..cdb5fec8 100644 --- a/vshard/storage/init.lua +++ b/vshard/storage/init.lua @@ -3180,6 +3180,7 @@ end local function storage_service_info() return { is_master = this_is_master(), + name = box.info.name, } end