diff --git a/CHANGELOG.md b/CHANGELOG.md index 07ac4d4..a24ea07 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,15 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ### Fixed +## [1.4.5] - 2025-09-22 + +Race condition between utubettl_fiber_iteration fiber and other operation. + +### Fixed + +- Erasing the task entry from _queue_taken_2 in the `utubettl` driver by + a race condition between utubettl_fiber_iteration fiber and other operation. + ## [1.4.4] - 2025-05-26 The patch release fixes incorrect behavior of the utubettl driver with enabled diff --git a/queue/abstract/driver/utubettl.lua b/queue/abstract/driver/utubettl.lua index 7b726ba..f8aed34 100644 --- a/queue/abstract/driver/utubettl.lua +++ b/queue/abstract/driver/utubettl.lua @@ -207,7 +207,6 @@ local function utubettl_fiber_iteration(self, processed) local estimated = util.MAX_TIMEOUT local commit_func = begin_if_not_in_txn(self) - local commited = false -- delayed tasks task = self.space.index.watch:min(delayed_state) @@ -221,8 +220,6 @@ local function utubettl_fiber_iteration(self, processed) if self.ready_space_mode then update_ready(self, task[i_id], task[i_utube], task[i_pri]) end - commit_func() - commited = true self:on_task_change(task, 'delayed') estimated = 0 @@ -231,12 +228,13 @@ local function utubettl_fiber_iteration(self, processed) estimated = tonumber(task[i_next_event] - now) / 1000000 end end - if not commited then - commit_func() - end + + commit_func() -- ttl tasks for _, state in pairs(ttl_states) do + commit_func = begin_if_not_in_txn(self) + task = self.space.index.watch:min{ state } if task ~= nil and task[i_status] == state then if now >= task[i_next_event] then @@ -249,10 +247,12 @@ local function utubettl_fiber_iteration(self, processed) estimated = et < estimated and et or estimated end end + + commit_func() end commit_func = begin_if_not_in_txn(self) - commited = false + -- ttr tasks task = self.space.index.watch:min(ttr_state) if task and task[i_status] == state.TAKEN then @@ -265,8 +265,6 @@ local function utubettl_fiber_iteration(self, processed) if self.ready_space_mode then put_ready(self, task[i_id], task[i_utube], task[i_pri]) end - commit_func() - commited = true self:on_task_change(task, 'ttr') estimated = 0 @@ -276,9 +274,8 @@ local function utubettl_fiber_iteration(self, processed) estimated = et < estimated and et or estimated end end - if not commited then - commit_func() - end + + commit_func() if estimated > 0 or processed > 1000 then -- free refcounter