Skip to content

Commit

Permalink
cfg: support multilisten and SSL
Browse files Browse the repository at this point in the history
SSL can't be properly tested while there is a crash in EE:
tarantool/tarantool-ee#109.

Part of #325

@TarantoolBot document
Title: vshard: multilisten and SSL

Multilisten is a feature which allows Tarantool to listen on
multiple ports at the same time (>= Tarantool 2.10.0-beta2).

SSL is a Tarantool Enterprise feature which allows to encrypt
the traffic.

The features are united by the fact that `box.cfg.listen` is not
equal to what is used for connecting to the instance.

In case of multilisten `box.cfg.listen` is an array of URIs while
`replica_object.uri` in vshard config should be a single URI.

In case of SSL `box.cfg.listen` should have a certificate and a
private key and an optional password. The connect-URI can have no
options, or have a CA-certificate to validate the server, or have
an own cert and a private key to be validated by the server.
Other combinations of parameters might be possible.

To use the features there are 2 ways.

The first way - in the common section of vshard config specify
only `replica_object.uri` values. Then on the needed storages pass
the `listen` option in the root of the config. It works for all
`box.cfg` options, not only for `listen`. Example of multilisten
usage:

```Lua
-- For storage_1_a:
{
    sharding = {
        ['storage_1_uuid'] = {
            replicas = {
                ['storage_1_a_uuid'] = {
                    uri = 'storage:storage@127.0.0.1:3301',
                    name = 'storage_1_a',
                },
            },
        },
    },
    listen = {
        'storage:storage@127.0.0.1:3301',
        'storage:storage@127.0.0.1:3302',
    },
}
-- For other storages and all routers:
{
    sharding = {
        ['storage_1_uuid'] = {
            replicas = {
                ['storage_1_a_uuid'] = {
                    uri = 'storage:storage@127.0.0.1:3301',
                    name = 'storage_1_a',
                },
            },
        },
    },
}
```
Similar with SSL.

The second way - use a new config option: `replica_object.listen`.
Example of SSL usage (Tarantool Enterprise only):

```Lua
{
    sharding = {
        ['storage_1_uuid'] = {
            replicas = {
                ['storage_1_a_uuid'] = {
                    uri = {
                        'storage:storage@127.0.0.1:3301',
                        params = {
                            transport = 'plain',
                            ssl_ca_file = ca_file,
                        }
                    },
                    listen = {
                        'storage:storage@127.0.0.1:3301',
                        params = {
                            transport = 'plain',
                            ssl_cert_file = cert_1_a_file,
                            ssl_key_file = key_1_a_file,
                        }
                    },
                    name = 'storage_1_a',
                },
            },
        },
    },
}
```
Similar with multilisten.

All routers and other storages will use the value in `uri` to
connect to the instance. The instance itself will use the value
from `listen` for `box.cfg.listen`.

VShard supports multilisten and SSL since 0.1.20 (not released
yet).
  • Loading branch information
Gerold103 committed May 10, 2022
1 parent 7754044 commit c97a37d
Show file tree
Hide file tree
Showing 8 changed files with 323 additions and 13 deletions.
23 changes: 23 additions & 0 deletions test/instances/storage.lua
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
#!/usr/bin/env tarantool
local helpers = require('test.luatest_helpers')

--
-- Commonly used libraries.
--
_G.fiber = require('fiber')

-- Do not load entire vshard into the global namespace to catch errors when code
-- relies on that.
_G.vshard = {
Expand All @@ -12,6 +18,7 @@ if box.ctl.set_on_shutdown_timeout then
end

box.cfg(helpers.box_cfg())
local instance_uuid = box.info.uuid
box.schema.user.grant('guest', 'super', nil, nil, {if_not_exists = true})

local function box_error()
Expand All @@ -22,7 +29,23 @@ local function echo(...)
return ...
end

local function get_uuid()
return instance_uuid
end

local function session_set(key, value)
box.session.storage[key] = value
return true
end

local function session_get(key)
return box.session.storage[key]
end

_G.box_error = box_error
_G.echo = echo
_G.get_uuid = get_uuid
_G.session_set = session_set
_G.session_get = session_get

_G.ready = true
134 changes: 128 additions & 6 deletions test/luatest_helpers/vtest.lua
Original file line number Diff line number Diff line change
@@ -1,7 +1,18 @@
local t = require('luatest')
local helpers = require('test.luatest_helpers')
local cluster = require('test.luatest_helpers.cluster')
local const = require('vshard.consts')

local uuid_idx = 1
--
-- The maps help to preserve the same UUID for replicas and replicasets during
-- reconfiguration. Reconfig means an update of a cfg template which doesn't
-- contain UUIDs + generation of a new real cfg to apply on nodes. The real cfg
-- needs to have same UUIDs for the nodes used in the old versions of the
-- template.
--
local replica_name_to_uuid_map = {}
local replicaset_name_to_uuid_map = {}

--
-- New UUID unique per this process. Generation is not random - for simplicity
Expand All @@ -14,6 +25,23 @@ local function uuid_next()
return '00000000-0000-0000-0000-'..string.rep('0', 12 - #last)..last
end

local function name_to_uuid(map, name)
local res = map[name]
if not res then
res = uuid_next()
map[name] = res
end
return res
end

local function replica_name_to_uuid(name)
return name_to_uuid(replica_name_to_uuid_map, name)
end

local function replicaset_name_to_uuid(name)
return name_to_uuid(replicaset_name_to_uuid_map, name)
end

--
-- Build a valid vshard config by a template. A template does not specify
-- anything volatile such as URIs, UUIDs - these are installed at runtime.
Expand All @@ -22,16 +50,33 @@ local function config_new(templ)
local res = table.deepcopy(templ)
local sharding = {}
res.sharding = sharding
-- Is supposed to intensify reconnects when replication and listen URIs
-- change.
res.replication_timeout = 0.1
for i, replicaset_templ in pairs(templ.sharding) do
local replicaset_uuid = uuid_next()
local replicaset_uuid = replicaset_name_to_uuid(i)
local replicas = {}
local replicaset = table.deepcopy(replicaset_templ)
replicaset.replicas = replicas
for replica_name, replica_templ in pairs(replicaset_templ.replicas) do
local replica_uuid = uuid_next()
local replica_uuid = replica_name_to_uuid(replica_name)
local replica = table.deepcopy(replica_templ)
replica.port_uri = nil
replica.port_count = nil
replica.name = replica_name
replica.uri = 'storage:storage@'..helpers.instance_uri(replica_name)

local port_count = replica_templ.port_count
local creds = 'storage:storage@'
if port_count == nil then
replica.uri = creds..helpers.instance_uri(replica_name)
else
local listen = table.new(port_count, 0)
for i = 1, port_count do
listen[i] = creds..helpers.instance_uri(replica_name..i)
end
replica.listen = listen
replica.uri = listen[replica_templ.port_uri]
end
replicas[replica_uuid] = replica
end
sharding[replicaset_uuid] = replicaset
Expand Down Expand Up @@ -73,6 +118,11 @@ local function storage_new(g, cfg)
box_cfg = box_cfg,
}, 'storage.lua')
g[name] = server
-- VShard specific details to use in various helper functions.
server.vtest = {
name = name,
is_storage = true,
}
g.cluster:add_server(server)

table.insert(all_servers, server)
Expand Down Expand Up @@ -109,6 +159,59 @@ local function storage_new(g, cfg)
end
end

--
-- Apply the config to all vshard storages in the cluster.
--
local function storage_cfg(g, cfg)
-- No support yet for dynamic node addition and removal. Only reconfig.
local fids = {}
local storages = {}
-- Map-reduce. It should make reconfig not only faster but also not depend
-- on which order would be non-blocking. For example, there might be a
-- config which makes the master hang until some replica is configured
-- first. When all are done in parallel, it won't matter.
for _, storage in pairs(g.cluster.servers) do
if storage.vtest and storage.vtest.is_storage then
table.insert(storages, storage)
table.insert(fids, storage:exec(function(cfg)
local f = fiber.new(vshard.storage.cfg, cfg, box.info.uuid)
f:set_joinable(true)
return f:id()
end, {cfg}))
end
end
local errors = {}
for i, storage in pairs(storages) do
local ok, err = storage:exec(function(fid)
return fiber.find(fid):join()
end, {fids[i]})
if not ok then
errors[storage.vtest.name] = err
end
end
t.assert_equals(errors, {}, 'storage reconfig')
end

--
-- Find first active bucket on the storage. In tests it helps not to assume
-- where the buckets are located by hardcoded numbers and uuids.
--
local function storage_first_bucket(storage)
return storage:exec(function(status)
local res = box.space._bucket.index.status:min(status)
return res ~= nil and res.id or nil
end, {const.BUCKET.ACTIVE})
end

--
-- Apply the config on the given router.
--
local function router_cfg(router, cfg)
router:exec(function(cfg)
vshard.router.cfg(cfg)
end, {cfg})
end

--
-- Create a new router in the cluster.
--
Expand All @@ -122,14 +225,33 @@ local function router_new(g, name, cfg)
g[name] = server
g.cluster:add_server(server)
server:start()
server:exec(function(cfg)
vshard.router.cfg(cfg)
end, {cfg})
router_cfg(server, cfg)
return server
end

--
-- Disconnect the router from all storages.
--
local function router_disconnect(router)
router:exec(function()
local replicasets = vshard.router.static.replicasets
for _, rs in pairs(replicasets) do
for _, r in pairs(rs.replicas) do
local c = r.conn
if c then
c:close()
end
end
end
end)
end

return {
config_new = config_new,
storage_new = storage_new,
storage_cfg = storage_cfg,
storage_first_bucket = storage_first_bucket,
router_new = router_new,
router_cfg = router_cfg,
router_disconnect = router_disconnect,
}
99 changes: 97 additions & 2 deletions test/router-luatest/router_test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ local vutil = require('vshard.util')
local wait_timeout = 120

local g = t.group('router')
local cluster_cfg = vtest.config_new({
local cfg_template = {
sharding = {
{
replicas = {
Expand All @@ -24,7 +24,22 @@ local cluster_cfg = vtest.config_new({
},
},
bucket_count = 100
})
}
local cluster_cfg = vtest.config_new(cfg_template)

local function callrw_get_uuid(bid, timeout)
return vshard.router.callrw(bid, 'get_uuid', {}, {timeout = timeout})
end

local function callrw_session_get(bid, key, timeout)
return vshard.router.callrw(bid, 'session_get', {key},
{timeout = timeout})
end

local function callrw_session_set(bid, key, value, timeout)
return vshard.router.callrw(bid, 'session_set', {key, value},
{timeout = timeout})
end

g.before_all(function()
vtest.storage_new(g, cluster_cfg)
Expand Down Expand Up @@ -261,3 +276,83 @@ g.test_map_callrw_raw = function(g)
_G.do_map = nil
end)
end

g.test_multilisten = function(g)
t.run_only_if(vutil.feature.multilisten)

local bid = vtest.storage_first_bucket(g.replica_1_a)

-- Set 2 listen ports on the master.
local new_cfg_template = table.deepcopy(cfg_template)
local rs_1_templ = new_cfg_template.sharding[1]
local rep_1_a_templ = rs_1_templ.replicas.replica_1_a
rep_1_a_templ.port_count = 2
-- Clients should use the first port.
rep_1_a_templ.port_uri = 1
local new_storage_cfg = vtest.config_new(new_cfg_template)
vtest.storage_cfg(g, new_storage_cfg)

-- Router connects to the first port.
local new_router_cfg = vtest.config_new(new_cfg_template)
vtest.router_cfg(g.router, new_router_cfg)

local rep_1_a_uuid = g.replica_1_a:instance_uuid()
local res, err = g.router:exec(callrw_get_uuid, {bid, wait_timeout})
t.assert_equals(err, nil, 'no error')
t.assert_equals(res, rep_1_a_uuid, 'went to 1_a')

-- Save a key in the session to check later for a reconnect.
res, err = g.router:exec(callrw_session_set, {bid, 1, 10, wait_timeout})
t.assert_equals(err, nil, 'no error')
t.assert(res, 'set session key')

-- The key is actually saved.
res, err = g.router:exec(callrw_session_get, {bid, 1, wait_timeout})
t.assert_equals(err, nil, 'no error')
t.assert_equals(res, 10, 'get session key')

-- Router connects to the second port. The storage's cfg is intentionally
-- unchanged.
rep_1_a_templ.port_uri = 2
new_router_cfg = vtest.config_new(new_cfg_template)
vtest.router_cfg(g.router, new_router_cfg)

res, err = g.router:exec(callrw_get_uuid, {bid, wait_timeout})
t.assert_equals(err, nil, 'no error')
t.assert_equals(res, rep_1_a_uuid, 'went to 1_a again')

-- There was a reconnect - the session is new.
res, err = g.router:exec(callrw_session_get, {bid, 1, wait_timeout})
t.assert_equals(err, nil, 'no error')
t.assert_equals(res, nil, 'no session key')

-- To confirm that the router uses the second port, shut it down on the
-- storage. The router won't be able to reconnect.
rep_1_a_templ.port_count = 1
rep_1_a_templ.port_uri = 1
new_storage_cfg = vtest.config_new(new_cfg_template)
vtest.storage_cfg(g, new_storage_cfg)
-- Force router reconnect. Otherwise the router would use the old still
-- alive connection even though the original listening socket is closed
-- above.
vtest.router_disconnect(g.router)

res, err = g.router:exec(callrw_get_uuid, {bid, 0.05})
t.assert_equals(res, nil, 'rw failed when second port was shut down')
-- Code can be anything really. Can't check it reliably not depending on OS.
t.assert_covers(err, {type = 'ClientError'}, 'got error')

-- Make the router connect to the first port while it still thinks there
-- are two ports.
rep_1_a_templ.port_count = 2
rep_1_a_templ.port_uri = 1
new_router_cfg = vtest.config_new(new_cfg_template)
vtest.router_cfg(g.router, new_router_cfg)
res, err = g.router:exec(callrw_get_uuid, {bid, wait_timeout})
t.assert_equals(err, nil, 'no error')
t.assert_equals(res, rep_1_a_uuid, 'went to 1_a again')

-- Restore everything back.
vtest.storage_cfg(g, cluster_cfg)
vtest.router_cfg(g.router, cluster_cfg)
end
Loading

0 comments on commit c97a37d

Please sign in to comment.