Skip to content

Commit

Permalink
Added some comments and removed duplicate code
Browse files Browse the repository at this point in the history
Remove expirationd_kill_task, duplicate of code.
Comments, readme and responses to errors are presented
in a more uniform form. Added additional comments for
easier understanding of what is happening. Delete `...`,
can't be jitted. Using outer double quotes only.

Needed for: #50
  • Loading branch information
ArtDu authored and ligurio committed Jul 6, 2021
1 parent 2ec3786 commit 900f389
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 73 deletions.
39 changes: 17 additions & 22 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ being deleted into some other space.
``` lua
box.cfg{}
space = box.space.old
job_name = 'clean_all'
expirationd = require('expirationd')
job_name = "clean_all"
expirationd = require("expirationd")
function is_expired(args, tuple)
return true
end
Expand All @@ -31,35 +31,30 @@ expirationd.start(job_name, space.id, is_expired, {

### `expirationd.start (name, space_id, is_tuple_expired, options)`

Run a named task
Run a scheduled task to check and process (expire) tuples in a given space.

* `name` - task name
* `space_id` - space to look in for expired tuples
* `is_tuple_expired` - a function, must accept tuple and return true/false
(is tuple expired or not), receives `(args, tuple)` as arguments
opt
* `options` -- (table with named options, may be nil)
* `process_expired_tuple` - applied to expired tuples, receives `(space_id, args, tuple)`
as arguments. Can be nil: by default tuples are removed
* `args` - passed to `is_tuple_expired()` and `process_expired_tuple()` as additional context
* `tuples_per_iteration` - number of tuples will be checked by one iteration
* `full_scan_time` - time required for full index scan (in seconds)
* `iteration_delay` - max sleep time between iterations (in seconds)
* `full_scan_delay` - sleep time between full scans (in seconds)
* `on_full_scan_start` - call function on starting full scan iteration
Receives `(task)` as arguments.
* `on_full_scan_complete` - call function on complete full scan iteration.
Called after `on_full_scan_success` or `on_full_scan_error`.
Receives `(task)` as arguments.
* `on_full_scan_success` - call function on success full scan iteration
Receives `(task)` as arguments.
* `on_full_scan_error` - call function on error full scan iteration
Receives `(task, error)` as arguments.
* `force` - run, even on replica
* `process_expired_tuple` - Applied to expired tuples, receives (space_id, args, tuple) as arguments.
Can be nil: by default, tuples are removed.
* `tuples_per_iteration` - Number of tuples to check in one batch (iteration). Default is 1024.
* `on_full_scan_start` - Function to call before starting a tuple scan.
* `on_full_scan_complete` - Function to call after completing a full scan.
* `on_full_scan_success` - Function to call after successfully completing a full scan.
* `on_full_scan_error` - Function to call after terminating a full scan due to an error.
* `args` - Passed to is_tuple_expired and process_expired_tuple() as an additional context.
* `full_scan_time` - Time required for a full index scan (in seconds).
* `iteration_delay` - Max sleep time between batches (in seconds).
* `full_scan_delay` - Sleep time between full scans (in seconds).
* `force` - Run task even on replica.


### `expirationd.kill (name)`

Kill an existing task with name 'name'
Kill an existing task with name "name"

* `name` - task's name

Expand Down
105 changes: 54 additions & 51 deletions expirationd.lua
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
-- local support functions
-- ========================================================================= --

local fun = require('fun')
local log = require('log')
local fiber = require('fiber')
local fun = require("fun")
local log = require("log")
local fiber = require("fiber")

-- get fiber id function
local function get_fiber_id(fiber)
Expand Down Expand Up @@ -45,7 +45,7 @@ local constants = {
-- factor for recalculation of vinyl space size
default_vinyl_assumed_space_len_factor = 2,
-- default function on full scan
default_on_full_scan = function(...) end
default_on_full_scan = function() end,
}

-- ========================================================================= --
Expand All @@ -56,13 +56,15 @@ local constants = {
-- Task fibers
-- ------------------------------------------------------------------------- --

-- get all fields in primary key(composite possible) from tuple
local function construct_key(space_id, tuple)
return fun.map(
function(x) return tuple[x.fieldno] end,
box.space[space_id].index[0].parts
):totable()
end

-- do expiration process on tuple
local function expiration_process(task, tuple)
task.checked_tuples_count = task.checked_tuples_count + 1
if task.is_tuple_expired(task.args, tuple) then
Expand All @@ -71,13 +73,15 @@ local function expiration_process(task, tuple)
end
end

-- yield for some time
local function suspend_basic(scan_space, task, len)
local delay = (task.tuples_per_iteration * task.full_scan_time)
delay = math.min(delay / len, task.iteration_delay)
fiber.sleep(delay)
end

local function suspend(scan_space, task)
-- Return the number of tuples in the space
local space_len = scan_space:len()
if space_len > 0 then
suspend_basic(scan_space, task, space_len)
Expand All @@ -86,7 +90,7 @@ end

local function tree_index_iter(scan_space, task)
-- iteration with GT iterator
local params = {iterator = 'GT', limit = task.tuples_per_iteration}
local params = {iterator = "GT", limit = task.tuples_per_iteration}
local last_id
local tuples = scan_space.index[0]:select({}, params)
while #tuples > 0 do
Expand Down Expand Up @@ -120,7 +124,7 @@ local function default_do_worker_iteration(task)
local index_type = scan_space.index[0].type

-- full index scan loop
if index_type == 'HASH' then
if index_type == "HASH" then
hash_index_iter(scan_space, task)
else
tree_index_iter(scan_space, task)
Expand All @@ -133,7 +137,7 @@ local function vinyl_do_worker_iteration(task)
local checked_tuples_count = 0
local space_len = task.vinyl_assumed_space_len

local params = {iterator = 'GT', limit = task.tuples_per_iteration}
local params = {iterator = "GT", limit = task.tuples_per_iteration}
local tuples = scan_space.index[0]:select({}, params)
while true do
local tuple_cnt = #tuples
Expand Down Expand Up @@ -186,6 +190,7 @@ local function guardian_loop(task)
fiber.name(string.format("guardian of %q", task.name), { truncate = true })

while true do
-- if fiber doesn't exist
if get_fiber_id(task.worker_fiber) == 0 then
-- create worker fiber
task.worker_fiber = fiber.create(worker_loop, task)
Expand All @@ -205,7 +210,7 @@ end
-- * task:start() -- start task
-- * task:stop() -- stop task
-- * task:restart() -- restart task
-- * task:kill() -- delete task and restart
-- * task:kill() -- stop task and delete
-- * task:statistics() -- return table with statistics
local Task_methods = {
start = function (self)
Expand All @@ -215,14 +220,14 @@ local Task_methods = {
stop = function (self)
if (get_fiber_id(self.guardian_fiber) ~= 0) then
self.guardian_fiber:cancel()
while self.guardian_fiber:status() ~= 'dead' do
while self.guardian_fiber:status() ~= "dead" do
fiber.sleep(0.01)
end
self.guardian_fiber = nil
end
if (get_fiber_id(self.worker_fiber) ~= 0) then
self.worker_fiber:cancel()
while self.worker_fiber:status() ~= 'dead' do
while self.worker_fiber:status() ~= "dead" do
fiber.sleep(0.01)
end
self.worker_fiber = nil
Expand Down Expand Up @@ -269,7 +274,7 @@ local function create_task(name)
on_full_scan_error = constants.default_on_full_scan,
on_full_scan_success = constants.default_on_full_scan,
on_full_scan_start = constants.default_on_full_scan,
on_full_scan_complete = constants.default_on_full_scan
on_full_scan_complete = constants.default_on_full_scan,
}, { __index = Task_methods })
return task
end
Expand Down Expand Up @@ -298,27 +303,28 @@ end
-- Expiration daemon management functions
-- ========================================================================= --

-- Run a named task
-- Run a scheduled task to check and process (expire) tuples in a given space.
--
-- params:
-- name -- task name
-- space_id -- space to look in for expired tuples
-- is_tuple_expired -- a function, must accept tuple and return
-- true/false (is tuple expired or not),
-- receives (args, tuple) as arguments
-- options = { -- (table with named options)
-- * process_expired_tuple -- applied to expired tuples, receives
-- (space_id, args, tuple) as arguments
-- * on_full_scan_start -- call function on starting full scan iteration
-- * on_full_scan_complete -- call function on complete full scan iteration
-- * on_full_scan_success -- call function on success full scan iteration
-- * on_full_scan_error -- call function on error full scan iteration
-- * args -- passed to is_tuple_expired and
-- process_expired_tuple() as additional context
-- * tuples_per_iteration -- number of tuples will be checked by one iteration
-- * full_scan_time -- time required for full index scan (in seconds)
-- * iteration_delay -- max sleep time between iterations (in seconds)
-- * full_scan_delay -- sleep time between full scans (in seconds)
-- * force -- run task even on replica
-- * process_expired_tuple -- Applied to expired tuples, receives (space_id, args, tuple) as arguments;
-- can be nil: by default, tuples are removed.
-- * on_full_scan_start -- Function to call before starting a full scan iteration.
-- * on_full_scan_complete -- Function to call after completing a full scan iteration.
-- * on_full_scan_success -- Function to call after successfully completing a full scan iteration.
-- * on_full_scan_error -- Function to call after terminating a full scan due to an error.
-- * args -- Passed to is_tuple_expired and
-- process_expired_tuple() as additional context.
-- * tuples_per_iteration -- Number of tuples to check in one batch (iteration); default is 1024.
-- * full_scan_time -- Time required for a full index scan (in seconds).
-- * iteration_delay -- Max sleep time between iterations (in seconds).
-- * full_scan_delay -- Sleep time between full scans (in seconds).
-- * force -- Run task even on replica.
-- }
local function expirationd_run_task(name, space_id, is_tuple_expired, options)
if name == nil then
Expand Down Expand Up @@ -347,7 +353,7 @@ local function expirationd_run_task(name, space_id, is_tuple_expired, options)
task.is_tuple_expired = is_tuple_expired

-- optional params
if options ~= nil and type(options) ~= 'table' then
if options ~= nil and type(options) ~= "table" then
error("options must be table or not defined")
end
options = options or {}
Expand All @@ -365,84 +371,84 @@ local function expirationd_run_task(name, space_id, is_tuple_expired, options)
-- check tuples per iteration (not required)
if options.tuples_per_iteration ~= nil then
if options.tuples_per_iteration <= 0 then
error("invalid tuples per iteration parameter")
error("Invalid tuples per iteration parameter")
end
task.tuples_per_iteration = options.tuples_per_iteration
end

-- check full scan time
if options.full_scan_time ~= nil then
if options.full_scan_time <= 0 then
error("invalid full scan time")
error("Invalid full scan time")
end
task.full_scan_time = options.full_scan_time
end

if options.force ~= nil then
if type(options.force) ~= 'boolean' then
if type(options.force) ~= "boolean" then
error("Invalid type of force value")
end
task.force = options.force
end

if options.vinyl_assumed_space_len ~= nil then
if type(options.vinyl_assumed_space_len) ~= 'number' then
if type(options.vinyl_assumed_space_len) ~= "number" then
error("Invalid type of vinyl_assumed_space_len value")
end
task.vinyl_assumed_space_len = options.vinyl_assumed_space_len
end

if options.vinyl_assumed_space_len_factor ~= nil then
if type(options.vinyl_assumed_space_len_factor) ~= 'number' then
if type(options.vinyl_assumed_space_len_factor) ~= "number" then
error("Invalid type of vinyl_assumed_space_len_factor value")
end
task.vinyl_assumed_space_len_factor = options.vinyl_assumed_space_len_factor
end

if box.space[task.space_id].engine == 'vinyl' then
if box.space[task.space_id].engine == "vinyl" then
task.do_worker_iteration = vinyl_do_worker_iteration
else
task.do_worker_iteration = default_do_worker_iteration
end

if options.iteration_delay ~= nil then
if type(options.iteration_delay) ~= 'number' then
error("invalid type of iteration_delay value")
if type(options.iteration_delay) ~= "number" then
error("Invalid type of iteration_delay value")
end
task.iteration_delay = options.iteration_delay
end

if options.full_scan_delay ~= nil then
if type(options.full_scan_delay) ~= 'number' then
error("invalid type of full_scan_delay value")
if type(options.full_scan_delay) ~= "number" then
error("Invalid type of full_scan_delay value")
end
task.full_scan_delay = options.full_scan_delay
end

if options.on_full_scan_start ~= nil then
if type(options.on_full_scan_start) ~= 'function' then
error("invalid type of on_full_scan_start is not function")
if type(options.on_full_scan_start) ~= "function" then
error("Invalid type of on_full_scan_start, expected function")
end
task.on_full_scan_start = options.on_full_scan_start
end

if options.on_full_scan_success ~= nil then
if type(options.on_full_scan_success) ~= 'function' then
error("invalid type of on_full_scan_success is not function")
if type(options.on_full_scan_success) ~= "function" then
error("Invalid type of on_full_scan_success, expected function")
end
task.on_full_scan_success = options.on_full_scan_success
end

if options.on_full_scan_complete ~= nil then
if type(options.on_full_scan_complete) ~= 'function' then
error("invalid type of on_full_scan_complete is not function")
if type(options.on_full_scan_complete) ~= "function" then
error("Invalid type of on_full_scan_complete, expected function")
end
task.on_full_scan_complete = options.on_full_scan_complete
end

if options.on_full_scan_error ~= nil then
if type(options.on_full_scan_error) ~= 'function' then
error("invalid type of on_full_scan_error is not function")
if type(options.on_full_scan_error) ~= "function" then
error("Invalid type of on_full_scan_error, expected function")
end
task.on_full_scan_error = options.on_full_scan_error
end
Expand Down Expand Up @@ -502,10 +508,7 @@ local function expirationd_task_stats(name)
return retval
end

-- kill task
local function expirationd_kill_task(name)
return get_task(name):kill()
end


-- get task by name
local function expirationd_get_task(name)
Expand All @@ -517,15 +520,15 @@ end
-- * require new expirationd
-- * restart all tasks
local function expirationd_update()
local expd_prev = require('expirationd')
local expd_prev = require("expirationd")
table.clear(expd_prev)
setmetatable(expd_prev, {
__index = function(name)
error("Wait until update is done before using expirationd", 2)
end
})
package.loaded['expirationd'] = nil
local expd_new = require('expirationd')
package.loaded["expirationd"] = nil
local expd_new = require("expirationd")
local tmp_task_list = task_list; task_list = {}
for name, task in pairs(tmp_task_list) do
task:kill()
Expand Down

0 comments on commit 900f389

Please sign in to comment.