Skip to content

Commit 0037195

Browse files
author
Алексей Зорькин
committed
utubettl-on-task-change: commit after on_task_change
1 parent a8f5e6b commit 0037195

File tree

2 files changed

+21
-16
lines changed

2 files changed

+21
-16
lines changed

queue/abstract.lua

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ local function tube_release_all_orphaned_tasks(tube)
6262
}
6363
if taken and session.exist_shared(taken[4]) then
6464
log.info(prefix ..
65-
('skipping task: %d, tube_id: %d'):format(task[1],
65+
('skipping task: %s, tube_id: %d'):format(task[1],
6666
tube.tube_id))
6767
else
6868
tube.raw:release(task[1], {})

queue/abstract/driver/utubettl.lua

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -221,10 +221,10 @@ local function utubettl_fiber_iteration(self, processed)
221221
if self.ready_space_mode then
222222
update_ready(self, task[i_id], task[i_utube], task[i_pri])
223223
end
224+
self:on_task_change(task, 'delayed')
224225
commit_func()
225226
commited = true
226227

227-
self:on_task_change(task, 'delayed')
228228
estimated = 0
229229
processed = processed + 1
230230
else
@@ -265,10 +265,10 @@ local function utubettl_fiber_iteration(self, processed)
265265
if self.ready_space_mode then
266266
put_ready(self, task[i_id], task[i_utube], task[i_pri])
267267
end
268+
self:on_task_change(task, 'ttr')
268269
commit_func()
269270
commited = true
270271

271-
self:on_task_change(task, 'ttr')
272272
estimated = 0
273273
processed = processed + 1
274274
else
@@ -450,9 +450,9 @@ function method.put(self, data, opts)
450450
put_ready(self, task[i_id], task[i_utube], task[i_pri])
451451
end
452452

453+
self:on_task_change(task, 'put')
453454
commit_func()
454455

455-
self:on_task_change(task, 'put')
456456
return task
457457
end
458458

@@ -515,13 +515,13 @@ local function take_ready(self)
515515
end
516516
end
517517

518-
commit_func()
519-
520518
if take_complete then
521519
self:on_task_change(task, 'take')
520+
commit_func()
522521
return task
523522
elseif take_ttl then
524523
self:on_task_change(task, 'ttl')
524+
commit_func()
525525
end
526526
end
527527
end
@@ -554,11 +554,13 @@ local function take(self)
554554
take_complete = true
555555
end
556556

557-
commit_func()
558557
if take_complete then
559558
self:on_task_change(t, 'take')
559+
commit_func()
560560
return t
561561
end
562+
563+
commit_func()
562564
end
563565
end
564566
end
@@ -607,11 +609,13 @@ function method.delete(self, id)
607609
commit_func()
608610

609611
if is_taken then
610-
return process_neighbour(self, task, 'delete')
612+
task = process_neighbour(self, task, 'delete')
611613
else
612614
self:on_task_change(task, 'delete')
613-
return task
614615
end
616+
617+
commit_func()
618+
return task
615619
end
616620

617621
commit_func()
@@ -637,9 +641,10 @@ function method.release(self, id, opts)
637641
put_next_ready(self, task[i_utube])
638642
end
639643

644+
task = process_neighbour(self, task, 'release')
640645
commit_func()
641646

642-
return process_neighbour(self, task, 'release')
647+
return task
643648
end
644649
else
645650
task = self.space:update(id, {
@@ -652,8 +657,8 @@ function method.release(self, id, opts)
652657
end
653658
end
654659

655-
commit_func()
656660
self:on_task_change(task, 'release')
661+
commit_func()
657662
return task
658663
end
659664

@@ -684,11 +689,12 @@ function method.bury(self, id)
684689
end
685690
end
686691

692+
task = process_neighbour(
693+
self, task:transform(i_status, 1, state.BURIED), 'bury'
694+
)
687695
commit_func()
688696

689-
return process_neighbour(
690-
self, task:transform(i_status, 1, state.BURIED), 'bury'
691-
)
697+
return task
692698
end
693699

694700
-- unbury several tasks
@@ -713,9 +719,8 @@ function method.kick(self, count)
713719
update_ready(self, task[i_id], task[i_utube], task[i_pri])
714720
end
715721

716-
commit_func()
717-
718722
self:on_task_change(task, 'kick')
723+
commit_func()
719724
end
720725
return count
721726
end

0 commit comments

Comments
 (0)