Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,15 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

### Fixed

## [1.4.5] - 2025-09-22

Race condition between utubettl_fiber_iteration fiber and other operation.

### Fixed

- Erasing the task entry from _queue_taken_2 in the `utubettl` driver by
a race condition between utubettl_fiber_iteration fiber and other operation.

## [1.4.4] - 2025-05-26

The patch release fixes incorrect behavior of the utubettl driver with enabled
Expand Down
21 changes: 9 additions & 12 deletions queue/abstract/driver/utubettl.lua
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,6 @@ local function utubettl_fiber_iteration(self, processed)
local estimated = util.MAX_TIMEOUT

local commit_func = begin_if_not_in_txn(self)
local commited = false

-- delayed tasks
task = self.space.index.watch:min(delayed_state)
Expand All @@ -221,8 +220,6 @@ local function utubettl_fiber_iteration(self, processed)
if self.ready_space_mode then
update_ready(self, task[i_id], task[i_utube], task[i_pri])
end
commit_func()
commited = true

self:on_task_change(task, 'delayed')
estimated = 0
Expand All @@ -231,12 +228,13 @@ local function utubettl_fiber_iteration(self, processed)
estimated = tonumber(task[i_next_event] - now) / 1000000
end
end
if not commited then
commit_func()
end

commit_func()

-- ttl tasks
for _, state in pairs(ttl_states) do
commit_func = begin_if_not_in_txn(self)

task = self.space.index.watch:min{ state }
if task ~= nil and task[i_status] == state then
if now >= task[i_next_event] then
Expand All @@ -249,10 +247,12 @@ local function utubettl_fiber_iteration(self, processed)
estimated = et < estimated and et or estimated
end
end

commit_func()
end

commit_func = begin_if_not_in_txn(self)
commited = false

-- ttr tasks
task = self.space.index.watch:min(ttr_state)
if task and task[i_status] == state.TAKEN then
Expand All @@ -265,8 +265,6 @@ local function utubettl_fiber_iteration(self, processed)
if self.ready_space_mode then
put_ready(self, task[i_id], task[i_utube], task[i_pri])
end
commit_func()
commited = true

self:on_task_change(task, 'ttr')
estimated = 0
Expand All @@ -276,9 +274,8 @@ local function utubettl_fiber_iteration(self, processed)
estimated = et < estimated and et or estimated
end
end
if not commited then
commit_func()
end

commit_func()

if estimated > 0 or processed > 1000 then
-- free refcounter
Expand Down