Skip to content
This repository has been archived by the owner on Feb 11, 2022. It is now read-only.

Commit

Permalink
Remove replication=false, bug fixes
Browse files Browse the repository at this point in the history
* Use test_run cluster utils to manage shard nodes. Always use
replication to maintain redundancy.
* Validate shard.init options.

Part of #38
  • Loading branch information
Gerold103 committed Oct 18, 2017
1 parent 6ab67ee commit 2e9e86b
Show file tree
Hide file tree
Showing 87 changed files with 655 additions and 2,148 deletions.
7 changes: 0 additions & 7 deletions shard/connpool.lua
Original file line number Diff line number Diff line change
Expand Up @@ -148,17 +148,14 @@ local function monitor_fiber(self)
break
end
end
print('dead: %d', dead)
for k, v in pairs(self.heartbeat_state) do
-- kill only if DEAD_TIMEOUT become in all servers
if k ~= uri and (v[uri] == nil or v[uri].try < self.DEAD_TIMEOUT) then
log.warn(v[uri].try)
log.debug("%s is alive", uri)
dead = false
break
end
end
print('dead: %d', dead)
if dead then
server.conn:close()
self.epoch_counter = self.epoch_counter + 1
Expand Down Expand Up @@ -200,8 +197,6 @@ end

-- heartbeat table and opinions management
local function update_heartbeat(self, uri, response, status)
log.warn('update heartbeat')
log.warn(status)
-- set or update opinions and timestamp
if self.self_server == nil then
return
Expand Down Expand Up @@ -233,7 +228,6 @@ local function heartbeat_fiber(self)
log.debug("checking %s", uri)

if server.conn == nil then
log.warn('set -1 to all')
for _, opinion in pairs(self.heartbeat_state[server.uri]) do
opinion.ts = fiber.time()
opinion.try = INFINITY_MIN
Expand Down Expand Up @@ -340,7 +334,6 @@ local function connect(self, id, server)
break
end
conn:close()
log.warn(" - %s - server check failure", server.uri)
fiber.sleep(1)
end
end
Expand Down
47 changes: 42 additions & 5 deletions shard/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ local STATE_INPROGRESS = 1
local STATE_HANDLED = 2

local init_complete = false
local configuration = {}
local shard_obj

-- 1.6 and 1.7 netbox compat
Expand Down Expand Up @@ -497,9 +496,7 @@ local function request(self, space, operation, tuple_id, ...)

for _, server in ipairs(nodes) do
table.insert(result, single_call(self, space, server, operation, ...))
if configuration.replication == true then
break
end
break
end
return result
end
Expand Down Expand Up @@ -728,9 +725,49 @@ local function enable_operations()
})
end

local cfg_template = {
servers = 'table',
login = 'string',
password = 'string',
monitor = 'boolean',
pool_name = 'string',
redundancy = 'number',
rsd_max_rps = 'number',
binary = 'number'
}

local cfg_default = {
servers = {},
monitor = true,
pool_name = 'sharding_pool',
redundancy = 2,
rsd_max_rps = 1000,
}

local function check_cfg(cfg)
cfg = table.deepcopy(cfg)
-- Set default options.
for k,v in pairs(cfg_default) do
if cfg[k] == nil then
cfg[k] = cfg_default[k]
end
end
-- Check specified options.
for k,v in pairs(cfg) do
if cfg_template[k] == nil then
error("Unknown cfg option "..k)
end
if cfg_template[k] ~= type(v) then
error("Incorrect type of cfg option "..k..": expected "..
cfg_template[k])
end
end
return cfg
end

-- init shard, connect with servers
local function init(cfg, callback)
configuration = cfg
cfg = check_cfg(cfg)
log.info('Sharding initialization started...')
-- set constants
pool.REMOTE_TIMEOUT = shard_obj.REMOTE_TIMEOUT
Expand Down
2 changes: 1 addition & 1 deletion test-run
3 changes: 3 additions & 0 deletions test/join/box.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
box.cfg{}

require('console').listen(os.getenv('ADMIN'))
36 changes: 0 additions & 36 deletions test/join/join1.lua

This file was deleted.

35 changes: 0 additions & 35 deletions test/join/join2.lua

This file was deleted.

1 change: 1 addition & 0 deletions test/join/lua
50 changes: 0 additions & 50 deletions test/join/master.lua

This file was deleted.

53 changes: 53 additions & 0 deletions test/join/master0.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
#!/usr/bin/env tarantool
shard = require('shard')
os = require('os')
fiber = require('fiber')
util = require('util')

local cfg = {
servers = {
{ uri = util.instance_uri(0), zone = '0' },
{ uri = util.instance_uri(1), zone = '0' },
{ uri = util.instance_uri(2), zone = '0' },
{ uri = util.instance_uri(3), zone = '1' },
{ uri = util.instance_uri(4), zone = '1' },
{ uri = util.instance_uri(5), zone = '1' },
},
login = 'tester',
password = 'pass',
monitor = true,
redundancy = 2,
binary = util.instance_port(util.INSTANCE_ID),
}

require('console').listen(os.getenv('ADMIN'))

local replication = {}
if util.INSTANCE_ID <= 2 then
replication = {
util.instance_uri(util.INSTANCE_ID),
util.instance_uri(util.INSTANCE_ID + 3)
}
else
replication = {
util.instance_uri(util.INSTANCE_ID - 3),
util.instance_uri(util.INSTANCE_ID)
}
end

box.cfg {
listen = cfg.binary,
replication = replication,
}
util.create_replica_user(cfg)

if util.INSTANCE_ID <= 2 then
local demo = box.schema.create_space('demo', {if_not_exists = true})
demo:create_index('primary', {type = 'tree', parts = {1, 'unsigned'}, if_not_exists = true})
else
while box.space.demo == nil or box.space.demo.index[0] == nil do
fiber.sleep(0.01)
end
end

fiber.create(function() shard.init(cfg) end)
44 changes: 1 addition & 43 deletions test/join/master1.lua
Original file line number Diff line number Diff line change
@@ -1,44 +1,2 @@
#!/usr/bin/env tarantool
shard = require('shard')
os = require('os')
fiber = require('fiber')

local cfg = {
servers = {
{ uri = 'localhost:33130', zone = '0' };
{ uri = 'localhost:33131', zone = '0' };
{ uri = 'localhost:33132', zone = '0' };
{ uri = 'localhost:33133', zone = '1' };
{ uri = 'localhost:33134', zone = '1' };
{ uri = 'localhost:33135', zone = '1' };
};
login = 'tester';
password = 'pass';
monitor = false;
redundancy = 2;
replication = true;
binary = 33131;
}

box.cfg {
slab_alloc_arena = 0.1;
listen = cfg.binary;
custom_proc_title = "master"
}

require('console').listen(os.getenv('ADMIN'))

if not box.space.demo then
box.schema.user.create(cfg.login, { password = cfg.password })
box.schema.user.grant(cfg.login, 'read,write,execute', 'universe')
box.schema.user.grant('guest', 'read,write,execute', 'universe')

local demo = box.schema.create_space('demo')
demo:create_index('primary', {type = 'tree', parts = {1, 'num'}})
end

-- init shards
fiber.create(function()
shard.init(cfg)
end)

dofile('master0.lua')
44 changes: 1 addition & 43 deletions test/join/master2.lua
Original file line number Diff line number Diff line change
@@ -1,44 +1,2 @@
#!/usr/bin/env tarantool
shard = require('shard')
os = require('os')
fiber = require('fiber')

local cfg = {
servers = {
{ uri = 'localhost:33130', zone = '0' };
{ uri = 'localhost:33131', zone = '0' };
{ uri = 'localhost:33132', zone = '0' };
{ uri = 'localhost:33133', zone = '1' };
{ uri = 'localhost:33134', zone = '1' };
{ uri = 'localhost:33135', zone = '1' };
};
login = 'tester';
password = 'pass';
redundancy = 2;
monitor = false;
replication = true;
binary = 33132;
}

box.cfg {
slab_alloc_arena = 0.1;
listen = cfg.binary;
custom_proc_title = "master"
}

require('console').listen(os.getenv('ADMIN'))

if not box.space.demo then
box.schema.user.create(cfg.login, { password = cfg.password })
box.schema.user.grant(cfg.login, 'read,write,execute', 'universe')
box.schema.user.grant('guest', 'read,write,execute', 'universe')

local demo = box.schema.create_space('demo')
demo:create_index('primary', {type = 'tree', parts = {1, 'num'}})
end

-- init shards
fiber.create(function()
shard.init(cfg)
end)

dofile('master0.lua')
Loading

0 comments on commit 2e9e86b

Please sign in to comment.