From 63640566e1ab956fcc49161ff28a167db0b3cabc Mon Sep 17 00:00:00 2001 From: Nikita Zheleztsov Date: Tue, 19 Dec 2023 21:11:30 +0300 Subject: [PATCH] replicaset: introduce name validation In case of name identification, no UUID may be passed at all, so we cannot verify only UUID, when connecting to storage. It seems impossible to extend the current net.box greeting by exposing net_box.conn.name to it, as iproto greeting doesn't have enough free space to save 64 bit instance name. So, we should deal with name validation on vshard side. For this, conn.vconnect is introduced. It's asynchronous vshard greeting, saved inside netbox connection. It stores future object and additional info, needed for its work. Future is initialized, when the connection is established (inside netbox_on_connect). The connection cannot be considered "connected" until vconnect is properly validated. Currently only instance_name is validated inside conn.vconnect. Closes #426 NO_DOC=internal --- test/replicaset-luatest/replicaset_3_test.lua | 12 +- test/replicaset-luatest/vconnect_test.lua | 189 ++++++++++++++++++ test/storage/storage.result | 8 +- test/storage/storage.test.lua | 4 +- vshard/error.lua | 10 + vshard/replicaset.lua | 166 ++++++++++++++- vshard/storage/init.lua | 1 + 7 files changed, 381 insertions(+), 9 deletions(-) create mode 100644 test/replicaset-luatest/vconnect_test.lua diff --git a/test/replicaset-luatest/replicaset_3_test.lua b/test/replicaset-luatest/replicaset_3_test.lua index 19b4776f..8016bd96 100644 --- a/test/replicaset-luatest/replicaset_3_test.lua +++ b/test/replicaset-luatest/replicaset_3_test.lua @@ -3,6 +3,7 @@ local t = require('luatest') local vreplicaset = require('vshard.replicaset') local vtest = require('test.luatest_helpers.vtest') local verror = require('vshard.error') +local vutil = require('vshard.util') local small_timeout_opts = {timeout = 0.05} local timeout_opts = {timeout = vtest.wait_timeout} @@ -247,6 +248,7 @@ test_group.test_locate_master_when_no_conn_object = function(g) end test_group.test_named_replicaset = function(g) + t.run_only_if(vutil.feature.persistent_names) local new_cfg_template = table.deepcopy(cfg_template) new_cfg_template.identification_mode = 'name_as_key' new_cfg_template.sharding['replicaset'] = new_cfg_template.sharding[1] @@ -267,9 +269,15 @@ test_group.test_named_replicaset = function(g) t.assert_equals(rs.id, rs.name) t.assert_equals(replica_1_a.id, replica_1_a.name) - -- Just to be sure, that it works. + -- Name is not set, name mismatch error. + local ret, err = rs:callrw('get_uuid', {}, {timeout = 5}) + t.assert_equals(err.name, 'INSTANCE_NAME_MISMATCH') + t.assert_equals(ret, nil) + + -- Set name, everything works from now on. + g.replica_1_a:exec(function() box.cfg{instance_name = 'replica_1_a'} end) local uuid_a = g.replica_1_a:instance_uuid() - local ret, err = rs:callrw('get_uuid', {}, timeout_opts) + ret, err = rs:callrw('get_uuid', {}, timeout_opts) t.assert_equals(err, nil) t.assert_equals(ret, uuid_a) diff --git a/test/replicaset-luatest/vconnect_test.lua b/test/replicaset-luatest/vconnect_test.lua new file mode 100644 index 00000000..b00ce872 --- /dev/null +++ b/test/replicaset-luatest/vconnect_test.lua @@ -0,0 +1,189 @@ +local fiber = require('fiber') +local t = require('luatest') +local vreplicaset = require('vshard.replicaset') +local vtest = require('test.luatest_helpers.vtest') +local vutil = require('vshard.util') +local verror = require('vshard.error') + +local small_timeout_opts = {timeout = 0.01} +local timeout_opts = {timeout = vtest.wait_timeout} + +local test_group = t.group('vconnect') + +local cfg_template = { + sharding = { + replicaset = { + replicas = { + replica = { + master = true, + }, + }, + }, + }, + bucket_count = 20, + test_user_grant_range = 'super', + replication_timeout = 0.1, + identification_mode = 'name_as_key', +} +local global_cfg + +test_group.before_all(function(g) + t.run_only_if(vutil.feature.persistent_names) + global_cfg = vtest.config_new(cfg_template) + vtest.cluster_new(g, global_cfg) + vtest.cluster_bootstrap(g, global_cfg) + vtest.cluster_wait_vclock_all(g) +end) + +test_group.after_all(function(g) + g.cluster:stop() +end) + +-- +-- Test, that conn_vconnect_wait fails to get correct +-- result. Connection should be closed. +-- +test_group.test_vconnect_no_result = function(g) + local _, rs = next(vreplicaset.buildall(global_cfg)) + g.replica:exec(function() + rawset(_G, '_call', ivshard.storage._call) + ivshard.storage._call = nil + end) + + -- Drop connection in order to make replicaset to recreate it. + rs.master.conn = nil + local ret, err = rs:callrw('get_uuid', {}, timeout_opts) + t.assert_str_contains(err.message, "_call' is not defined") + t.assert_equals(ret, nil) + -- Critical error, connection should be closed. + t.assert_equals(rs.master.conn.state, 'closed') + + g.replica:exec(function() + ivshard.storage._call = _G._call + end) +end + +-- +-- Test, that conn_vconnect_wait fails, when future is nil. +-- +test_group.test_vconnect_no_future = function(g) + local _, rs = next(vreplicaset.buildall(global_cfg)) + g.replica:exec(function() + rawset(_G, '_call', ivshard.storage._call) + rawset(_G, 'do_sleep', true) + -- Future should not appear at all. + ivshard.storage._call = function() + while _G.do_sleep do + ifiber.sleep(0.1) + end + end + end) + + rs.master.conn = nil + local ret, err = rs:callrw('get_uuid', {}, small_timeout_opts) + t.assert(verror.is_timeout(err)) + t.assert_equals(ret, nil) + t.assert_not_equals(rs.master.conn.state, 'closed') + + g.replica:exec(function() + _G.do_sleep = false + ivshard.storage._call = _G._call + end) +end + +-- +-- Test, that conn_vconnect_check fails, when future's result is nil. +-- +test_group.test_vconnect_check_no_future = function(g) + local _, rs = next(vreplicaset.buildall(global_cfg)) + g.replica:exec(function() + rawset(_G, '_call', ivshard.storage._call) + ivshard.storage._call = nil + end) + + rs.master.conn = nil + local opts = table.deepcopy(timeout_opts) + opts.is_async = true + t.helpers.retrying({}, function() + -- It may be VHANDSHAKE_NOT_COMPLETE error, when future + -- is not ready. But at the end it must be the actual error. + local ret, err = rs:callrw('get_uuid', {}, opts) + t.assert_str_contains(err.message, "_call' is not defined") + t.assert_equals(ret, nil) + t.assert_equals(rs.master.conn.state, 'closed') + end) + + g.replica:exec(function() + ivshard.storage._call = _G._call + end) +end + +-- +-- 1. Change name and stop replica. +-- 2. Wait for error_reconnect timeout. +-- 3. Assert, that on reconnect name change is noticed. +-- +test_group.test_vconnect_on_reconnect = function(g) + local _, rs = next(vreplicaset.buildall(global_cfg)) + t.assert_not_equals(rs:connect_master(), nil) + -- Configuration to use after restart. + local new_cfg = table.deepcopy(global_cfg) + local cfg_rs = new_cfg.sharding.replicaset + cfg_rs.replicas.bad = cfg_rs.replicas.replica + cfg_rs.replicas.replica = nil + + g.replica:exec(function() + box.cfg{instance_name = 'bad', force_recovery = true} + end) + g.replica:stop() + t.helpers.retrying({}, function() + t.assert_equals(rs.master.conn.state, 'error_reconnect') + end) + + -- Replica cannot be started with incorrect name, change box.cfg. + g.replica.box_cfg.instance_name = 'bad' + g.replica:start() + vtest.cluster_cfg(g, new_cfg) + local ret, err = rs:callrw('get_uuid', {}, timeout_opts) + t.assert_equals(err.name, 'INSTANCE_NAME_MISMATCH') + t.assert_equals(ret, nil) + t.assert_equals(rs.master.conn.state, 'closed') + + g.replica:exec(function() + box.cfg{instance_name = 'replica', force_recovery = true} + end) + vtest.cluster_cfg(g, global_cfg) +end + +-- +-- Test, that async call doesn't yield and immediately fails. +-- +test_group.test_async_no_yield = function(g) + local _, rs = next(vreplicaset.buildall(global_cfg)) + g.replica:exec(function() + rawset(_G, '_call', ivshard.storage._call) + rawset(_G, 'do_sleep', true) + -- Future should not appear at all. + ivshard.storage._call = function() + while _G.do_sleep do + ifiber.sleep(0.1) + end + end + end) + + local opts = table.deepcopy(timeout_opts) + opts.is_async = true + local csw1 = fiber.self():csw() + local ret, err = rs:callrw('get_uuid', {}, opts) + local csw2 = fiber.self():csw() + -- Waiting for #456 to be fixed. + t.assert_equals(csw2, csw1 + 1) + t.assert_str_contains(err.name, 'VHANDSHAKE_NOT_COMPLETE') + t.assert_equals(ret, nil) + t.assert_not_equals(rs.master.conn.state, 'closed') + + g.replica:exec(function() + _G.do_sleep = false + ivshard.storage._call = _G._call + end) +end diff --git a/test/storage/storage.result b/test/storage/storage.result index 37c6ac78..8cd1cdf4 100644 --- a/test/storage/storage.result +++ b/test/storage/storage.result @@ -1014,16 +1014,16 @@ vshard.storage.internal.errinj.ERRINJ_RECOVERY_PAUSE = false -- -- Internal info function. -- -vshard.storage._call('info') +vshard.storage._call('info').is_master --- -- is_master: true +- true ... _ = test_run:switch('storage_1_b') --- ... -vshard.storage._call('info') +vshard.storage._call('info').is_master --- -- is_master: false +- false ... -- -- gh-123, gh-298: storage auto-enable/disable depending on instance state. diff --git a/test/storage/storage.test.lua b/test/storage/storage.test.lua index 8fd8cd26..6c3b5515 100644 --- a/test/storage/storage.test.lua +++ b/test/storage/storage.test.lua @@ -327,9 +327,9 @@ vshard.storage.internal.errinj.ERRINJ_RECOVERY_PAUSE = false -- -- Internal info function. -- -vshard.storage._call('info') +vshard.storage._call('info').is_master _ = test_run:switch('storage_1_b') -vshard.storage._call('info') +vshard.storage._call('info').is_master -- -- gh-123, gh-298: storage auto-enable/disable depending on instance state. diff --git a/vshard/error.lua b/vshard/error.lua index 3ab0832e..1d78d7e2 100644 --- a/vshard/error.lua +++ b/vshard/error.lua @@ -202,6 +202,16 @@ local error_message_template = { msg = 'Bucket %s update is invalid: %s', args = {'bucket_id', 'reason'}, }, + [40] = { + name = 'VHANDSHAKE_NOT_COMPLETE', + msg = 'Handshake with %s have not been completed yet', + args = {'replica'}, + }, + [41] = { + name = 'INSTANCE_NAME_MISMATCH', + msg = 'Mismatch server name: expected "%s", but got "%s"', + args = {'expected_name', 'actual_name'}, + }, } -- diff --git a/vshard/replicaset.lua b/vshard/replicaset.lua index b1e2defe..e213fc80 100644 --- a/vshard/replicaset.lua +++ b/vshard/replicaset.lua @@ -75,6 +75,144 @@ local fiber_cond_wait = util.fiber_cond_wait local future_wait = util.future_wait local gsc = util.generate_self_checker +-- +-- vconnect is asynchronous vshard greeting, saved inside netbox connection. +-- It stores future object and additional info, needed for its work. +-- Future is initialized, when the connection is established (inside +-- netbox_on_connect). The connection cannot be considered "connected" +-- until vconnect is properly validated. +-- +local function conn_vconnect_set(conn) + assert(conn.replica ~= nil) + local is_named = conn.replica.id == conn.replica.name + if not is_named then + -- Nothing to do. Check is not needed. + return + end + -- Update existing vconnect. Fiber condition cannot be dropped, + -- somebody may already waiting on it. + if conn.vconnect then + -- Connections are preserved during reconfiguration, + -- identification may be changed. + conn.vconnect.is_named = is_named + if not conn.vconnect.future then + return + end + -- Future object must be updated. Old result is irrelevant. + conn.vconnect.future:discard() + conn.vconnect.future = nil + return + end + -- Create new vconnect. + conn.vconnect = { + -- Whether the connection is done to the named replica. + is_named = is_named, + -- Used to wait for the appearance of vconnect.future object, + -- if call is done before the connection is established. + future_cond = fiber.cond(), + } +end + +-- +-- Initialize future object. Should be done, when connection is established +-- (inside netbox_on_connect). +-- +local function conn_vconnect_start(conn) + local vconn = conn.vconnect + if not vconn then + -- Nothing to do. Check is not needed. + return + end + + local opts = {is_async = true} + vconn.future = conn:call('vshard.storage._call', {'info'}, opts) + vconn.future_cond:broadcast() +end + +-- +-- Check, that future is ready, and its result is expected. +-- The function doesn't yield. +-- @retval true; The correct response is received. +-- @retval nil, ...; Response is not received or validation error happened. +-- +local function conn_vconnect_check(conn) + local vconn = conn.vconnect + -- conn.vconnect may be nil, if connection was created on old version + -- and the storage was reloaded to a new one. It's also nil, when + -- all checks were already done. + if not vconn then + return true + end + -- Nothing to do, but wait in such case. + if not vconn.future or not vconn.future:is_ready() then + return nil, lerror.vshard(lerror.code.VHANDSHAKE_NOT_COMPLETE, + conn.replica.id) + end + -- Critical errors. Connection should be closed after these ones. + local result, err = vconn.future:result() + if not result then + -- Failed to get response. E.g. access error. + return nil, lerror.make(err) + end + if vconn.is_named and result[1].name ~= conn.replica.name then + return nil, lerror.vshard(lerror.code.INSTANCE_NAME_MISMATCH, + conn.replica.name, result[1].name) + end + -- Don't validate until reconnect happens. + conn.vconnect = nil + return true +end + +local function conn_vconnect_check_or_close(conn) + local ok, err = conn_vconnect_check(conn) + -- Close the connection, if error happened, but it is not + -- VSHANDSHAKE_NOT_COMPLETE. + if not ok and err and not (err.type == 'ShardingError' and + err.code == lerror.code.VHANDSHAKE_NOT_COMPLETE) then + conn:close() + end + return ok, err +end + +-- +-- Wait until the future object is ready. Returns remaining timeout. +-- @retval timeout; Future boject is ready. +-- @retval nil, err; Timeout passed. +-- +local function conn_vconnect_wait(conn, timeout) + local vconn = conn.vconnect + -- Fast path. In most cases no validation should be done at all. + -- conn.vconnect may be nil, if connection was created on old version + -- and the storage was reloaded to a new one. It's also nil, when + -- all checks were already done. + if not vconn or (vconn.future and vconn.future:is_ready()) then + return timeout + end + local deadline = fiber_clock() + timeout + -- Wait for connection to be established. + if not vconn.future and + not fiber_cond_wait(vconn.future_cond, timeout) then + return nil, lerror.timeout() + end + timeout = deadline - fiber_clock() + -- Wait for async call to return. + local res, err = future_wait(vconn.future, timeout) + if res == nil then + -- Either timeout error or any other. If it's not a timeout error, + -- then conn must be recteated, handshake must be retried. + return nil, lerror.make(err) + end + return deadline - fiber_clock() +end + +local function conn_vconnect_wait_or_close(conn, timeout) + local ok, err = conn_vconnect_wait(conn, timeout) + if not ok and not lerror.is_timeout(err) then + conn:close() + end + return ok, err +end + -- -- on_connect() trigger for net.box -- @@ -86,6 +224,11 @@ local function netbox_on_connect(conn) -- If a replica's connection has revived, then unset -- replica.down_ts - it is not down anymore. replica.down_ts = nil + -- conn.vconnect is set on every connect, as it may be nil, + -- if previous check successfully passed. Also, the needed + -- checks may change on reconnect. + conn_vconnect_set(conn) + conn_vconnect_start(conn) if replica.uuid and conn.peer_uuid ~= replica.uuid and -- XXX: Zero UUID means not a real Tarantool instance. It -- is likely to be a cartridge.remote-control server, @@ -222,6 +365,11 @@ local function replicaset_connect_to_replica(replicaset, replica) }) conn.replica = replica conn.replicaset = replicaset + -- vconnect must be set before the time, connection is established, + -- as we must know, that the connection cannot be used. If vconnect + -- is nil, it means all checks passed, so we may make a call and + -- only after that 'require' checks. + conn_vconnect_set(conn) conn.on_connect_ref = netbox_on_connect conn:on_connect(netbox_on_connect) conn.on_disconnect_ref = netbox_on_disconnect @@ -421,6 +569,21 @@ local function replica_call(replica, func, args, opts) assert(opts and opts.timeout) replica.activity_ts = fiber_clock() local conn = replica.conn + if not opts.is_async then + -- Async call cannot yield. So, we cannot wait for the connection + -- to be established and validate vconnect. Immediately fail below, + -- in conn_vconnect_check_or_close, if something is wrong. + local timeout, err = conn_vconnect_wait_or_close(conn, opts.timeout) + if not timeout then + return false, nil, lerror.make(err) + end + opts.timeout = timeout + end + local ok, err = conn_vconnect_check_or_close(conn) + if not ok then + return false, nil, lerror.make(err) + end + assert(conn.vconnect == nil) local net_status, storage_status, retval, error_object = pcall(conn.call, conn, func, args, opts) if not net_status then @@ -1031,7 +1194,8 @@ replicaset_mt.__index = index local replica_mt = { __index = { is_connected = function(replica) - return replica.conn and replica.conn:is_connected() + return replica.conn and replica.conn:is_connected() and + conn_vconnect_check_or_close(replica.conn) end, safe_uri = function(replica) local uri = luri.parse(replica.uri) diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua index 6603189e..cdb5fec8 100644 --- a/vshard/storage/init.lua +++ b/vshard/storage/init.lua @@ -3180,6 +3180,7 @@ end local function storage_service_info() return { is_master = this_is_master(), + name = box.info.name, } end