From c97a37dcffe9bb286b2a57f1d1037bc99c4a1a8d Mon Sep 17 00:00:00 2001 From: Vladislav Shpilevoy Date: Fri, 6 May 2022 23:45:55 +0200 Subject: [PATCH] cfg: support multilisten and SSL SSL can't be properly tested while there is a crash in EE: https://github.com/tarantool/tarantool-ee/issues/109. Part of #325 @TarantoolBot document Title: vshard: multilisten and SSL Multilisten is a feature which allows Tarantool to listen on multiple ports at the same time (>= Tarantool 2.10.0-beta2). SSL is a Tarantool Enterprise feature which allows to encrypt the traffic. The features are united by the fact that `box.cfg.listen` is not equal to what is used for connecting to the instance. In case of multilisten `box.cfg.listen` is an array of URIs while `replica_object.uri` in vshard config should be a single URI. In case of SSL `box.cfg.listen` should have a certificate and a private key and an optional password. The connect-URI can have no options, or have a CA-certificate to validate the server, or have an own cert and a private key to be validated by the server. Other combinations of parameters might be possible. To use the features there are 2 ways. The first way - in the common section of vshard config specify only `replica_object.uri` values. Then on the needed storages pass the `listen` option in the root of the config. It works for all `box.cfg` options, not only for `listen`. Example of multilisten usage: ```Lua -- For storage_1_a: { sharding = { ['storage_1_uuid'] = { replicas = { ['storage_1_a_uuid'] = { uri = 'storage:storage@127.0.0.1:3301', name = 'storage_1_a', }, }, }, }, listen = { 'storage:storage@127.0.0.1:3301', 'storage:storage@127.0.0.1:3302', }, } -- For other storages and all routers: { sharding = { ['storage_1_uuid'] = { replicas = { ['storage_1_a_uuid'] = { uri = 'storage:storage@127.0.0.1:3301', name = 'storage_1_a', }, }, }, }, } ``` Similar with SSL. The second way - use a new config option: `replica_object.listen`. Example of SSL usage (Tarantool Enterprise only): ```Lua { sharding = { ['storage_1_uuid'] = { replicas = { ['storage_1_a_uuid'] = { uri = { 'storage:storage@127.0.0.1:3301', params = { transport = 'plain', ssl_ca_file = ca_file, } }, listen = { 'storage:storage@127.0.0.1:3301', params = { transport = 'plain', ssl_cert_file = cert_1_a_file, ssl_key_file = key_1_a_file, } }, name = 'storage_1_a', }, }, }, }, } ``` Similar with multilisten. All routers and other storages will use the value in `uri` to connect to the instance. The instance itself will use the value from `listen` for `box.cfg.listen`. VShard supports multilisten and SSL since 0.1.20 (not released yet). --- test/instances/storage.lua | 23 +++++ test/luatest_helpers/vtest.lua | 134 ++++++++++++++++++++++++++-- test/router-luatest/router_test.lua | 99 +++++++++++++++++++- test/unit-luatest/config_test.lua | 46 +++++++++- vshard/cfg.lua | 24 ++++- vshard/replicaset.lua | 2 +- vshard/storage/init.lua | 6 +- vshard/util.lua | 2 + 8 files changed, 323 insertions(+), 13 deletions(-) diff --git a/test/instances/storage.lua b/test/instances/storage.lua index 2afc9422..371afe5c 100755 --- a/test/instances/storage.lua +++ b/test/instances/storage.lua @@ -1,5 +1,11 @@ #!/usr/bin/env tarantool local helpers = require('test.luatest_helpers') + +-- +-- Commonly used libraries. +-- +_G.fiber = require('fiber') + -- Do not load entire vshard into the global namespace to catch errors when code -- relies on that. _G.vshard = { @@ -12,6 +18,7 @@ if box.ctl.set_on_shutdown_timeout then end box.cfg(helpers.box_cfg()) +local instance_uuid = box.info.uuid box.schema.user.grant('guest', 'super', nil, nil, {if_not_exists = true}) local function box_error() @@ -22,7 +29,23 @@ local function echo(...) return ... end +local function get_uuid() + return instance_uuid +end + +local function session_set(key, value) + box.session.storage[key] = value + return true +end + +local function session_get(key) + return box.session.storage[key] +end + _G.box_error = box_error _G.echo = echo +_G.get_uuid = get_uuid +_G.session_set = session_set +_G.session_get = session_get _G.ready = true diff --git a/test/luatest_helpers/vtest.lua b/test/luatest_helpers/vtest.lua index e5da876b..274f3f48 100644 --- a/test/luatest_helpers/vtest.lua +++ b/test/luatest_helpers/vtest.lua @@ -1,7 +1,18 @@ +local t = require('luatest') local helpers = require('test.luatest_helpers') local cluster = require('test.luatest_helpers.cluster') +local const = require('vshard.consts') local uuid_idx = 1 +-- +-- The maps help to preserve the same UUID for replicas and replicasets during +-- reconfiguration. Reconfig means an update of a cfg template which doesn't +-- contain UUIDs + generation of a new real cfg to apply on nodes. The real cfg +-- needs to have same UUIDs for the nodes used in the old versions of the +-- template. +-- +local replica_name_to_uuid_map = {} +local replicaset_name_to_uuid_map = {} -- -- New UUID unique per this process. Generation is not random - for simplicity @@ -14,6 +25,23 @@ local function uuid_next() return '00000000-0000-0000-0000-'..string.rep('0', 12 - #last)..last end +local function name_to_uuid(map, name) + local res = map[name] + if not res then + res = uuid_next() + map[name] = res + end + return res +end + +local function replica_name_to_uuid(name) + return name_to_uuid(replica_name_to_uuid_map, name) +end + +local function replicaset_name_to_uuid(name) + return name_to_uuid(replicaset_name_to_uuid_map, name) +end + -- -- Build a valid vshard config by a template. A template does not specify -- anything volatile such as URIs, UUIDs - these are installed at runtime. @@ -22,16 +50,33 @@ local function config_new(templ) local res = table.deepcopy(templ) local sharding = {} res.sharding = sharding + -- Is supposed to intensify reconnects when replication and listen URIs + -- change. + res.replication_timeout = 0.1 for i, replicaset_templ in pairs(templ.sharding) do - local replicaset_uuid = uuid_next() + local replicaset_uuid = replicaset_name_to_uuid(i) local replicas = {} local replicaset = table.deepcopy(replicaset_templ) replicaset.replicas = replicas for replica_name, replica_templ in pairs(replicaset_templ.replicas) do - local replica_uuid = uuid_next() + local replica_uuid = replica_name_to_uuid(replica_name) local replica = table.deepcopy(replica_templ) + replica.port_uri = nil + replica.port_count = nil replica.name = replica_name - replica.uri = 'storage:storage@'..helpers.instance_uri(replica_name) + + local port_count = replica_templ.port_count + local creds = 'storage:storage@' + if port_count == nil then + replica.uri = creds..helpers.instance_uri(replica_name) + else + local listen = table.new(port_count, 0) + for i = 1, port_count do + listen[i] = creds..helpers.instance_uri(replica_name..i) + end + replica.listen = listen + replica.uri = listen[replica_templ.port_uri] + end replicas[replica_uuid] = replica end sharding[replicaset_uuid] = replicaset @@ -73,6 +118,11 @@ local function storage_new(g, cfg) box_cfg = box_cfg, }, 'storage.lua') g[name] = server + -- VShard specific details to use in various helper functions. + server.vtest = { + name = name, + is_storage = true, + } g.cluster:add_server(server) table.insert(all_servers, server) @@ -109,6 +159,59 @@ local function storage_new(g, cfg) end end +-- +-- Apply the config to all vshard storages in the cluster. +-- +local function storage_cfg(g, cfg) + -- No support yet for dynamic node addition and removal. Only reconfig. + local fids = {} + local storages = {} + -- Map-reduce. It should make reconfig not only faster but also not depend + -- on which order would be non-blocking. For example, there might be a + -- config which makes the master hang until some replica is configured + -- first. When all are done in parallel, it won't matter. + for _, storage in pairs(g.cluster.servers) do + if storage.vtest and storage.vtest.is_storage then + table.insert(storages, storage) + table.insert(fids, storage:exec(function(cfg) + local f = fiber.new(vshard.storage.cfg, cfg, box.info.uuid) + f:set_joinable(true) + return f:id() + end, {cfg})) + end + end + local errors = {} + for i, storage in pairs(storages) do + local ok, err = storage:exec(function(fid) + return fiber.find(fid):join() + end, {fids[i]}) + if not ok then + errors[storage.vtest.name] = err + end + end + t.assert_equals(errors, {}, 'storage reconfig') +end + +-- +-- Find first active bucket on the storage. In tests it helps not to assume +-- where the buckets are located by hardcoded numbers and uuids. +-- +local function storage_first_bucket(storage) + return storage:exec(function(status) + local res = box.space._bucket.index.status:min(status) + return res ~= nil and res.id or nil + end, {const.BUCKET.ACTIVE}) +end + +-- +-- Apply the config on the given router. +-- +local function router_cfg(router, cfg) + router:exec(function(cfg) + vshard.router.cfg(cfg) + end, {cfg}) +end + -- -- Create a new router in the cluster. -- @@ -122,14 +225,33 @@ local function router_new(g, name, cfg) g[name] = server g.cluster:add_server(server) server:start() - server:exec(function(cfg) - vshard.router.cfg(cfg) - end, {cfg}) + router_cfg(server, cfg) return server end +-- +-- Disconnect the router from all storages. +-- +local function router_disconnect(router) + router:exec(function() + local replicasets = vshard.router.static.replicasets + for _, rs in pairs(replicasets) do + for _, r in pairs(rs.replicas) do + local c = r.conn + if c then + c:close() + end + end + end + end) +end + return { config_new = config_new, storage_new = storage_new, + storage_cfg = storage_cfg, + storage_first_bucket = storage_first_bucket, router_new = router_new, + router_cfg = router_cfg, + router_disconnect = router_disconnect, } diff --git a/test/router-luatest/router_test.lua b/test/router-luatest/router_test.lua index 988a80c1..8ad0d915 100644 --- a/test/router-luatest/router_test.lua +++ b/test/router-luatest/router_test.lua @@ -4,7 +4,7 @@ local vutil = require('vshard.util') local wait_timeout = 120 local g = t.group('router') -local cluster_cfg = vtest.config_new({ +local cfg_template = { sharding = { { replicas = { @@ -24,7 +24,22 @@ local cluster_cfg = vtest.config_new({ }, }, bucket_count = 100 -}) +} +local cluster_cfg = vtest.config_new(cfg_template) + +local function callrw_get_uuid(bid, timeout) + return vshard.router.callrw(bid, 'get_uuid', {}, {timeout = timeout}) +end + +local function callrw_session_get(bid, key, timeout) + return vshard.router.callrw(bid, 'session_get', {key}, + {timeout = timeout}) +end + +local function callrw_session_set(bid, key, value, timeout) + return vshard.router.callrw(bid, 'session_set', {key, value}, + {timeout = timeout}) +end g.before_all(function() vtest.storage_new(g, cluster_cfg) @@ -261,3 +276,83 @@ g.test_map_callrw_raw = function(g) _G.do_map = nil end) end + +g.test_multilisten = function(g) + t.run_only_if(vutil.feature.multilisten) + + local bid = vtest.storage_first_bucket(g.replica_1_a) + + -- Set 2 listen ports on the master. + local new_cfg_template = table.deepcopy(cfg_template) + local rs_1_templ = new_cfg_template.sharding[1] + local rep_1_a_templ = rs_1_templ.replicas.replica_1_a + rep_1_a_templ.port_count = 2 + -- Clients should use the first port. + rep_1_a_templ.port_uri = 1 + local new_storage_cfg = vtest.config_new(new_cfg_template) + vtest.storage_cfg(g, new_storage_cfg) + + -- Router connects to the first port. + local new_router_cfg = vtest.config_new(new_cfg_template) + vtest.router_cfg(g.router, new_router_cfg) + + local rep_1_a_uuid = g.replica_1_a:instance_uuid() + local res, err = g.router:exec(callrw_get_uuid, {bid, wait_timeout}) + t.assert_equals(err, nil, 'no error') + t.assert_equals(res, rep_1_a_uuid, 'went to 1_a') + + -- Save a key in the session to check later for a reconnect. + res, err = g.router:exec(callrw_session_set, {bid, 1, 10, wait_timeout}) + t.assert_equals(err, nil, 'no error') + t.assert(res, 'set session key') + + -- The key is actually saved. + res, err = g.router:exec(callrw_session_get, {bid, 1, wait_timeout}) + t.assert_equals(err, nil, 'no error') + t.assert_equals(res, 10, 'get session key') + + -- Router connects to the second port. The storage's cfg is intentionally + -- unchanged. + rep_1_a_templ.port_uri = 2 + new_router_cfg = vtest.config_new(new_cfg_template) + vtest.router_cfg(g.router, new_router_cfg) + + res, err = g.router:exec(callrw_get_uuid, {bid, wait_timeout}) + t.assert_equals(err, nil, 'no error') + t.assert_equals(res, rep_1_a_uuid, 'went to 1_a again') + + -- There was a reconnect - the session is new. + res, err = g.router:exec(callrw_session_get, {bid, 1, wait_timeout}) + t.assert_equals(err, nil, 'no error') + t.assert_equals(res, nil, 'no session key') + + -- To confirm that the router uses the second port, shut it down on the + -- storage. The router won't be able to reconnect. + rep_1_a_templ.port_count = 1 + rep_1_a_templ.port_uri = 1 + new_storage_cfg = vtest.config_new(new_cfg_template) + vtest.storage_cfg(g, new_storage_cfg) + -- Force router reconnect. Otherwise the router would use the old still + -- alive connection even though the original listening socket is closed + -- above. + vtest.router_disconnect(g.router) + + res, err = g.router:exec(callrw_get_uuid, {bid, 0.05}) + t.assert_equals(res, nil, 'rw failed when second port was shut down') + -- Code can be anything really. Can't check it reliably not depending on OS. + t.assert_covers(err, {type = 'ClientError'}, 'got error') + + -- Make the router connect to the first port while it still thinks there + -- are two ports. + rep_1_a_templ.port_count = 2 + rep_1_a_templ.port_uri = 1 + new_router_cfg = vtest.config_new(new_cfg_template) + vtest.router_cfg(g.router, new_router_cfg) + res, err = g.router:exec(callrw_get_uuid, {bid, wait_timeout}) + t.assert_equals(err, nil, 'no error') + t.assert_equals(res, rep_1_a_uuid, 'went to 1_a again') + + -- Restore everything back. + vtest.storage_cfg(g, cluster_cfg) + vtest.router_cfg(g.router, cluster_cfg) +end diff --git a/test/unit-luatest/config_test.lua b/test/unit-luatest/config_test.lua index d74b7ce7..c2c6514c 100644 --- a/test/unit-luatest/config_test.lua +++ b/test/unit-luatest/config_test.lua @@ -4,7 +4,7 @@ local uuid = require('uuid') local g = t.group('config') -g.test_basic_uri = function() +g.test_replica_uri = function() local url = 'storage:storage@127.0.0.1:3301' local storage_1_a = { uri = url, @@ -42,3 +42,47 @@ g.test_basic_uri = function() storage_1_a.uri = uuid.new() t.assert_error(lcfg.check, config) end + +g.test_replica_listen = function() + local url1 = 'storage:storage@127.0.0.1:3301' + local url2 = 'storage:storage@127.0.0.1:3302' + local storage_1_a = { + uri = url1, + listen = url2, + name = 'storage_1_a', + } + local config = { + sharding = { + storage_1_uuid = { + replicas = { + storage_1_a_uuid = storage_1_a, + } + }, + }, + } + + local res = lcfg.check(config) + t.assert(res, 'listen and uri are both specified') + + local rep_1_a = res.sharding.storage_1_uuid.replicas.storage_1_a_uuid + t.assert_equals(rep_1_a.uri, url1, 'uri value') + t.assert_equals(rep_1_a.listen, url2, 'listen value') + + -- Listen by default is the same as uri. + storage_1_a.listen = nil + res = lcfg.check(config) + t.assert(res, 'listen is omitted') + + rep_1_a = res.sharding.storage_1_uuid.replicas.storage_1_a_uuid + t.assert_equals(rep_1_a.uri, url1, 'uri value') + t.assert_equals(rep_1_a.listen, rep_1_a.uri, 'listen value') + + -- Listen can by multiple. + storage_1_a.listen = {url1, url2} + res = lcfg.check(config) + t.assert(res, 'listen is array') + + rep_1_a = res.sharding.storage_1_uuid.replicas.storage_1_a_uuid + t.assert_equals(rep_1_a.uri, url1, 'uri value') + t.assert_equals(rep_1_a.listen, {url1, url2}, 'listen value') +end diff --git a/vshard/cfg.lua b/vshard/cfg.lua index f820b541..d2b5e60d 100644 --- a/vshard/cfg.lua +++ b/vshard/cfg.lua @@ -2,14 +2,25 @@ local log = require('log') local luri = require('uri') +local lutil = require('vshard.util') local consts = require('vshard.consts') -local function check_uri(uri) +local function check_uri_connect(uri) if not luri.parse(uri) then error('URI must be a non-empty string, or a number, or a table') end end +local function check_uri_listen(uri) + if not lutil.feature.multilisten then + error('Listen URIs require higher Tarantool version') + end + if not luri.parse_many(uri) then + error('URI must be a non-empty string, or a number, or a table, or an '.. + 'array of these types') + end +end + local function check_replica_master(master, ctx) if master then if ctx.master then @@ -116,7 +127,13 @@ local replica_template = { uri = { type = {'non-empty string', 'number', 'table'}, name = 'URI', - check = check_uri + check = check_uri_connect + }, + listen = { + type = {'non-empty string', 'number', 'table'}, + name = 'listen', + check = check_uri_listen, + is_optional = true, }, name = {type = 'string', name = "Name", is_optional = true}, zone = {type = {'string', 'number'}, name = "Zone", is_optional = true}, @@ -130,6 +147,9 @@ local function check_replicas(replicas) local ctx = {master = false} for _, replica in pairs(replicas) do validate_config(replica, replica_template, ctx) + if replica.listen == nil then + replica.listen = replica.uri + end end end diff --git a/vshard/replicaset.lua b/vshard/replicaset.lua index 76212649..31741942 100644 --- a/vshard/replicaset.lua +++ b/vshard/replicaset.lua @@ -4,7 +4,7 @@ -- = { -- replicas = { -- [replica_uuid] = { --- uri = string, +-- uri = URI, -- name = string, -- uuid = string, -- conn = + .replica + .replicaset, diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua index 241f634c..80dc824c 100644 --- a/vshard/storage/init.lua +++ b/vshard/storage/init.lua @@ -2692,7 +2692,11 @@ local function storage_cfg(cfg, this_replica_uuid, is_reload) -- -- If a master role of the replica is not changed, then -- 'read_only' can be set right here. - box_cfg.listen = box_cfg.listen or this_replica.uri + local this_replicaset_cfg = vshard_cfg.sharding[this_replicaset.uuid] + local this_replica_cfg = this_replicaset_cfg.replicas[this_replica.uuid] + + box_cfg.listen = box_cfg.listen or this_replica_cfg.listen or + this_replica_cfg.uri if not box_cfg.replication then box_cfg.replication = {} for _, replica in pairs(this_replicaset.replicas) do diff --git a/vshard/util.lua b/vshard/util.lua index c6975cfd..88901b2e 100644 --- a/vshard/util.lua +++ b/vshard/util.lua @@ -4,6 +4,7 @@ local fiber = require('fiber') local lerror = require('vshard.error') local lversion = require('vshard.version') local lmsgpack = require('msgpack') +local luri = require('uri') local MODULE_INTERNALS = '__module_vshard_util' local M = rawget(_G, MODULE_INTERNALS) @@ -238,6 +239,7 @@ end local feature = { msgpack_object = lmsgpack.object ~= nil, netbox_return_raw = version_is_at_least(2, 10, 0, 'beta', 2, 86), + multilisten = luri.parse_many ~= nil, } return {