Skip to content

Commit 7406b87

Browse files
author
Алексей Зорькин
committed
utubettl-on-task-change: commit after on_task_change
1 parent 0037195 commit 7406b87

File tree

3 files changed

+34
-33
lines changed

3 files changed

+34
-33
lines changed

CHANGELOG.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,15 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
1313

1414
### Fixed
1515

16+
## [1.4.5] - 2025-09-22
17+
18+
Race condition between utubettl_fiber_iteration fiber and other operation.
19+
20+
### Fixed
21+
22+
- Erasing the task entry from _queue_taken_2 in the `utubettl` driver by
23+
a race condition between utubettl_fiber_iteration fiber and other operation.
24+
1625
## [1.4.4] - 2025-05-26
1726

1827
The patch release fixes incorrect behavior of the utubettl driver with enabled

queue/abstract.lua

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ local function tube_release_all_orphaned_tasks(tube)
6262
}
6363
if taken and session.exist_shared(taken[4]) then
6464
log.info(prefix ..
65-
('skipping task: %s, tube_id: %d'):format(task[1],
65+
('skipping task: %d, tube_id: %d'):format(task[1],
6666
tube.tube_id))
6767
else
6868
tube.raw:release(task[1], {})

queue/abstract/driver/utubettl.lua

Lines changed: 24 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,6 @@ local function utubettl_fiber_iteration(self, processed)
207207
local estimated = util.MAX_TIMEOUT
208208

209209
local commit_func = begin_if_not_in_txn(self)
210-
local commited = false
211210

212211
-- delayed tasks
213212
task = self.space.index.watch:min(delayed_state)
@@ -221,22 +220,21 @@ local function utubettl_fiber_iteration(self, processed)
221220
if self.ready_space_mode then
222221
update_ready(self, task[i_id], task[i_utube], task[i_pri])
223222
end
224-
self:on_task_change(task, 'delayed')
225-
commit_func()
226-
commited = true
227223

224+
self:on_task_change(task, 'delayed')
228225
estimated = 0
229226
processed = processed + 1
230227
else
231228
estimated = tonumber(task[i_next_event] - now) / 1000000
232229
end
233230
end
234-
if not commited then
235-
commit_func()
236-
end
231+
232+
commit_func()
237233

238234
-- ttl tasks
239235
for _, state in pairs(ttl_states) do
236+
commit_func = begin_if_not_in_txn(self)
237+
240238
task = self.space.index.watch:min{ state }
241239
if task ~= nil and task[i_status] == state then
242240
if now >= task[i_next_event] then
@@ -249,10 +247,12 @@ local function utubettl_fiber_iteration(self, processed)
249247
estimated = et < estimated and et or estimated
250248
end
251249
end
250+
251+
commit_func()
252252
end
253253

254254
commit_func = begin_if_not_in_txn(self)
255-
commited = false
255+
256256
-- ttr tasks
257257
task = self.space.index.watch:min(ttr_state)
258258
if task and task[i_status] == state.TAKEN then
@@ -265,20 +265,17 @@ local function utubettl_fiber_iteration(self, processed)
265265
if self.ready_space_mode then
266266
put_ready(self, task[i_id], task[i_utube], task[i_pri])
267267
end
268-
self:on_task_change(task, 'ttr')
269-
commit_func()
270-
commited = true
271268

269+
self:on_task_change(task, 'ttr')
272270
estimated = 0
273271
processed = processed + 1
274272
else
275273
local et = tonumber(task[i_next_event] - now) / 1000000
276274
estimated = et < estimated and et or estimated
277275
end
278276
end
279-
if not commited then
280-
commit_func()
281-
end
277+
278+
commit_func()
282279

283280
if estimated > 0 or processed > 1000 then
284281
-- free refcounter
@@ -450,9 +447,9 @@ function method.put(self, data, opts)
450447
put_ready(self, task[i_id], task[i_utube], task[i_pri])
451448
end
452449

453-
self:on_task_change(task, 'put')
454450
commit_func()
455451

452+
self:on_task_change(task, 'put')
456453
return task
457454
end
458455

@@ -515,13 +512,13 @@ local function take_ready(self)
515512
end
516513
end
517514

515+
commit_func()
516+
518517
if take_complete then
519518
self:on_task_change(task, 'take')
520-
commit_func()
521519
return task
522520
elseif take_ttl then
523521
self:on_task_change(task, 'ttl')
524-
commit_func()
525522
end
526523
end
527524
end
@@ -554,13 +551,11 @@ local function take(self)
554551
take_complete = true
555552
end
556553

554+
commit_func()
557555
if take_complete then
558556
self:on_task_change(t, 'take')
559-
commit_func()
560557
return t
561558
end
562-
563-
commit_func()
564559
end
565560
end
566561
end
@@ -609,13 +604,11 @@ function method.delete(self, id)
609604
commit_func()
610605

611606
if is_taken then
612-
task = process_neighbour(self, task, 'delete')
607+
return process_neighbour(self, task, 'delete')
613608
else
614609
self:on_task_change(task, 'delete')
610+
return task
615611
end
616-
617-
commit_func()
618-
return task
619612
end
620613

621614
commit_func()
@@ -641,10 +634,9 @@ function method.release(self, id, opts)
641634
put_next_ready(self, task[i_utube])
642635
end
643636

644-
task = process_neighbour(self, task, 'release')
645637
commit_func()
646638

647-
return task
639+
return process_neighbour(self, task, 'release')
648640
end
649641
else
650642
task = self.space:update(id, {
@@ -657,8 +649,8 @@ function method.release(self, id, opts)
657649
end
658650
end
659651

660-
self:on_task_change(task, 'release')
661652
commit_func()
653+
self:on_task_change(task, 'release')
662654
return task
663655
end
664656

@@ -689,12 +681,11 @@ function method.bury(self, id)
689681
end
690682
end
691683

692-
task = process_neighbour(
693-
self, task:transform(i_status, 1, state.BURIED), 'bury'
694-
)
695684
commit_func()
696685

697-
return task
686+
return process_neighbour(
687+
self, task:transform(i_status, 1, state.BURIED), 'bury'
688+
)
698689
end
699690

700691
-- unbury several tasks
@@ -719,8 +710,9 @@ function method.kick(self, count)
719710
update_ready(self, task[i_id], task[i_utube], task[i_pri])
720711
end
721712

722-
self:on_task_change(task, 'kick')
723713
commit_func()
714+
715+
self:on_task_change(task, 'kick')
724716
end
725717
return count
726718
end

0 commit comments

Comments
 (0)