Skip to content

Commit

Permalink
Atomic iteration
Browse files Browse the repository at this point in the history
One transaction per batch option.
With task:kill, the batch with transactions will be finalized and only
after that the fiber will complete its work

Needed for: tarantool#50
  • Loading branch information
ArtDu committed May 10, 2022
1 parent 9513058 commit e895a66
Show file tree
Hide file tree
Showing 4 changed files with 219 additions and 1 deletion.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ Run a scheduled task to check and process (expire) tuples in a given space.
The index value may be a single value, if the index consists of one field, a tuple with the index key parts, or a function which returns such value.
If omitted or nil, all tuples will be checked.
* `tuples_per_iteration` - Number of tuples to check in one batch (iteration). Default is 1024.
* `atomic_iteration` - Boolean, false (default) to process each tuple as a single transaction.
True to process tuples from each batch in a single transaction.
* `process_while` - Function to call before checking each tuple.
If it returns false, the current tuple scan task finishes.
* `iterate_with` - Function which returns an iterator object which provides tuples to check, considering the start_key, process_while and other options.
Expand Down
42 changes: 41 additions & 1 deletion expirationd.lua
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ local constants = {
process_while = function() return true end,
-- default iterating over the loop will go in ascending index
iterator_type = "ALL",
-- default atomic_iteration is false, batch of items doesn't include in one transaction
atomic_iteration = false,
}

-- ========================================================================= --
Expand Down Expand Up @@ -99,13 +101,22 @@ local function default_do_worker_iteration(task)
local space_len = task.vinyl_assumed_space_len
local checked_tuples_count = 0
local vinyl_checked_tuples_count = 0
if task.atomic_iteration then
box.begin()
end
for _, tuple in task:iterate_with() do
checked_tuples_count = checked_tuples_count + 1
vinyl_checked_tuples_count = vinyl_checked_tuples_count + 1
expiration_process(task, tuple)
-- find out if the worker can go to sleep
-- if the batch is full
if checked_tuples_count >= task.tuples_per_iteration then
if task.atomic_iteration then
box.commit()
if task.worker_canceled then
return true
end
end
checked_tuples_count = 0
if box.space[task.space_id].engine == "vinyl" then
if vinyl_checked_tuples_count > space_len then
Expand All @@ -115,8 +126,14 @@ local function default_do_worker_iteration(task)
else
suspend(task)
end
if task.atomic_iteration then
box.begin()
end
end
end
if task.atomic_iteration then
box.commit()
end
if box.space[task.space_id].engine == "vinyl" then
task.vinyl_assumed_space_len = vinyl_checked_tuples_count
end
Expand All @@ -127,9 +144,15 @@ local function worker_loop(task)
fiber.name(string.format("worker of %q", task.name), { truncate = true })

while true do
if task.worker_canceled then
fiber.self():cancel()
end
if (box.cfg.replication_source == nil and box.cfg.replication == nil) or task.force then
task.on_full_scan_start(task)
local state, err = pcall(task.do_worker_iteration, task)
if task.worker_canceled then
fiber.self():cancel()
end
if state then
task.on_full_scan_success(task)
else
Expand All @@ -138,6 +161,7 @@ local function worker_loop(task)

task.on_full_scan_complete(task)
if not state then
box.rollback()
error(err)
end
end
Expand Down Expand Up @@ -188,7 +212,11 @@ local Task_methods = {
self.guardian_fiber = nil
end
if (get_fiber_id(self.worker_fiber) ~= 0) then
self.worker_fiber:cancel()
if self.atomic_iteration then
self.worker_canceled = true
else
self.worker_fiber:cancel()
end
while self.worker_fiber:status() ~= "dead" do
fiber.sleep(0.01)
end
Expand Down Expand Up @@ -229,6 +257,7 @@ local function create_task(name)
args = nil,
index = nil,
iterate_with = nil,
worker_canceled = false,
iteration_delay = constants.max_delay,
full_scan_delay = constants.max_delay,
tuples_per_iteration = constants.default_tuples_per_iteration,
Expand All @@ -242,6 +271,7 @@ local function create_task(name)
start_key = constants.start_key,
process_while = constants.process_while,
iterator_type = constants.iterator_type,
atomic_iteration = constants.atomic_iteration,
}, { __index = Task_methods })
return task
end
Expand Down Expand Up @@ -303,6 +333,8 @@ end
-- or a function which returns such value;
-- if omitted or nil, all tuples will be checked.
-- * tuples_per_iteration -- Number of tuples to check in one batch (iteration); default is 1024.
-- * atomic_iteration -- Boolean, false (default) to process each tuple as a single transaction;
-- true to process tuples from each batch in a single transaction.
-- * process_while -- Function to call before checking each tuple;
-- if it returns false, the task will stop until next full scan.
-- * iterate_with -- Function which returns an iterator object which provides tuples to check,
Expand Down Expand Up @@ -402,6 +434,14 @@ local function expirationd_run_task(name, space_id, is_tuple_expired, options)
task.process_while = options.process_while
end

-- check transaction option
if options.atomic_iteration ~= nil then
if type(options.atomic_iteration) ~= "boolean" then
error("Invalid type of atomic_iteration, expected boolean")
end
task.atomic_iteration = options.atomic_iteration
end

-- check iterate_with
if options.iterate_with ~= nil then
if type(options.iterate_with) ~= "function" then
Expand Down
1 change: 1 addition & 0 deletions test/helper.lua
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ t.before_suite(function()
wal_dir = t.datadir,
memtx_dir = t.datadir,
vinyl_dir = t.datadir,
vinyl_memory = 1024,
}

local tree = box.schema.create_space("tree", { if_not_exists = true })
Expand Down
175 changes: 175 additions & 0 deletions test/unit/atomic_iteration_test.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
local fiber = require("fiber")
local expirationd = require("expirationd")
local t = require("luatest")
local g = t.group("atomic_iteration")

local helpers = require("test.helper")

g.before_all(function()
helpers.init_spaces(g)
end)

g.after_each(function()
helpers.truncate_spaces(g)
end)

function g.test_passing()
local task = expirationd.start("clean_all", g.tree.id, helpers.is_expired_true)
t.assert_equals(task.atomic_iteration, false)
task:kill()

task = expirationd.start("clean_all", g.tree.id, helpers.is_expired_true,
{atomic_iteration = true})
t.assert_equals(task.atomic_iteration, true)
task:kill()

-- errors
t.assert_error_msg_content_equals("Invalid type of atomic_iteration, expected boolean",
expirationd.start, "clean_all", g.tree.id, helpers.is_expired_true,
{atomic_iteration = 1})
end

function g.test_memtx()
helpers.iteration_result = {}

local transactions = {}
local function f(iterator)
local transaction = {}
-- old / new_tuple is not passed for vinyl
for request_number, old_tuple, new_tuple, space_id in iterator() do
table.insert(transaction, old_tuple:totable())
end
table.insert(transactions, transaction)
end

local true_box_begin = box.begin

-- mock box.begin
box.begin = function ()
true_box_begin()
box.on_commit(f)
end

for _, space in pairs({g.tree, g.hash}) do
-- tuples expired in one atomic_iteration
space:insert({1, "3"})
space:insert({2, "2"})
space:insert({3, "1"})


local task = expirationd.start("clean_all", space.id, helpers.is_expired_debug,
{atomic_iteration = true})
-- wait for tuples expired
helpers.retrying({}, function()
if space.index[0].type == "HASH" then
t.assert_equals(helpers.iteration_result, {{3, "1"}, {2, "2"}, {1, "3"}})
else
t.assert_equals(helpers.iteration_result, {{1, "3"}, {2, "2"}, {3, "1"}})
end
end)
task:kill()
helpers.iteration_result = {}

-- check out three row transaction
if space.index[0].type == "HASH" then
t.assert_equals(transactions, {
{ {3, "1"}, {2, "2"}, {1, "3"} }
})
else
t.assert_equals(transactions, {
{ {1, "3"}, {2, "2"}, {3, "1"} }
})
end
transactions = {}

-- tuples expired in two atomic_iteration
space:insert({1, "3"})
space:insert({2, "2"})
space:insert({3, "1"})

task = expirationd.start("clean_all", space.id, helpers.is_expired_debug,
{atomic_iteration = true, tuples_per_iteration = 2})
-- wait for tuples expired
-- 2 seconds because suspend will be yield in task
helpers.retrying({}, function()
if space.index[0].type == "HASH" then
t.assert_equals(helpers.iteration_result, {{3, "1"}, {2, "2"}, {1, "3"}})
else
t.assert_equals(helpers.iteration_result, {{1, "3"}, {2, "2"}, {3, "1"}})
end
end)
task:kill()
helpers.iteration_result = {}

if space.index[0].type == "HASH" then
t.assert_equals(transactions, {
{ {3, "1"}, {2, "2"} }, -- check two row transaction
{ {1, "3"} } -- check single row transactions
})
else
t.assert_equals(transactions, {
{ {1, "3"}, {2, "2"} }, -- check two row transaction
{ {3, "1"} } -- check single row transactions
})
end

transactions = {}
end

-- unmock
box.begin = true_box_begin
end

-- it's not check tarantool or vinyl as engine
-- just check expirationd task continue work after conflicts
function g.test_mvcc_vinyl_tx_conflict()
for i = 1,10 do
g.vinyl:insert({i, tostring(i), nil, nil, 0})
end

local updaters = {}
for i = 1,10 do
local updater = fiber.create(function()
fiber.name(string.format("updater of %d", i), { truncate = true })
while true do
g.vinyl:update({i}, { {"+", 5, 1} })
fiber.yield()
end
end)
table.insert(updaters, updater)
end

local task = expirationd.start("clean_all", g.vinyl.id, helpers.is_expired_debug,
{atomic_iteration = true})

-- wait for tuples expired
fiber.sleep(3)

for i = 1,10 do
updaters[i]:cancel()
end

helpers.retrying({}, function()
t.assert_equals(g.vinyl:select(), {})
end)
t.assert(box.stat.vinyl().tx.conflict > 0)
t.assert_equals(box.stat.vinyl().tx.conflict, box.stat.vinyl().tx.rollback)
t.assert_equals(box.stat.vinyl().tx.transactions, 0)
task:kill()
end

function g.test_kill_task()
for i = 1,1024*10 do
g.tree:insert({i, tostring(i)})
end

local task = expirationd.start("clean_all", g.tree.id, helpers.is_expired_debug,
{atomic_iteration = true})

task:kill()
t.assert(g.tree:count() > 0)
t.assert(g.tree:count() % 1024 == 0)

-- return to default value
box.cfg{vinyl_memory = 134217728}
end

0 comments on commit e895a66

Please sign in to comment.