- 
                Notifications
    You must be signed in to change notification settings 
- Fork 55
utubettl: commit transaction after on_task_change #250
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
| 
 Значит ли это, что операции с queue вы выполняете внутри транзакций? | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Похоже, что это более распространенная проблема в наших драйверах, чем кажется на первый взгляд.
Нам нужно больше времени на анализ, чтобы исправить такую гонку во всех драйверах и ситуациях.
Я предлагаю ограничиться изменениями в tubettl_fiber_iteration, удалив остальные изменения в драйвере. Чтобы ещё больше не задерживать этот PR и закрыть текущую вашу боль.
Кроме того, код после изменения выглядит немного странным и запутанным. Я бы предложил ограничиться таким изменением:
diff --git a/queue/abstract/driver/utubettl.lua b/queue/abstract/driver/utubettl.lua
index 7b726ba..f5cdf20 100644
--- a/queue/abstract/driver/utubettl.lua
+++ b/queue/abstract/driver/utubettl.lua
@@ -207,7 +207,6 @@ local function utubettl_fiber_iteration(self, processed)
     local estimated = util.MAX_TIMEOUT
 
     local commit_func = begin_if_not_in_txn(self)
-    local commited = false
 
     -- delayed tasks
     task = self.space.index.watch:min(delayed_state)
@@ -221,8 +220,6 @@ local function utubettl_fiber_iteration(self, processed)
             if self.ready_space_mode then
                 update_ready(self, task[i_id], task[i_utube], task[i_pri])
             end
-            commit_func()
-            commited = true
 
             self:on_task_change(task, 'delayed')
             estimated = 0
@@ -231,16 +228,18 @@ local function utubettl_fiber_iteration(self, processed)
             estimated = tonumber(task[i_next_event] - now) / 1000000
         end
     end
-    if not commited then
-        commit_func()
-    end
+
+    commit_func()
 
     -- ttl tasks
     for _, state in pairs(ttl_states) do
+        local commit_func = begin_if_not_in_txn(self)
+
         task = self.space.index.watch:min{ state }
         if task ~= nil and task[i_status] == state then
             if now >= task[i_next_event] then
                 task = self:delete(task[i_id]):transform(2, 1, state.DONE)
+
                 self:on_task_change(task, 'ttl')
                 estimated = 0
                 processed = processed + 1
@@ -249,10 +248,13 @@ local function utubettl_fiber_iteration(self, processed)
                 estimated = et < estimated and et or estimated
             end
         end
+
+        commit_func()
     end
 
+
     commit_func = begin_if_not_in_txn(self)
-    commited = false
+
     -- ttr tasks
     task = self.space.index.watch:min(ttr_state)
     if task and task[i_status] == state.TAKEN then
@@ -265,8 +267,6 @@ local function utubettl_fiber_iteration(self, processed)
             if self.ready_space_mode then
                 put_ready(self, task[i_id], task[i_utube], task[i_pri])
             end
-            commit_func()
-            commited = true
 
             self:on_task_change(task, 'ttr')
             estimated = 0
@@ -276,9 +276,8 @@ local function utubettl_fiber_iteration(self, processed)
             estimated = et < estimated and et or estimated
         end
     end
-    if not commited then
-        commit_func()
-    end
+
+    commit_func()
 
     if estimated > 0 or processed > 1000 then
         -- free refcounterТут очевидно, что для каждой отдельной операции открываем транзакцию.
| Привет! По PR'у есть замечание: если mvcc отключен, то если внутри  | 
Встретил такую ситуацию при использовании, фикс в данном мр:
Файбер utubettl_fiber_iteration перевел задачу в статус R, сделав коммит, и в этот момент произошел свич контекст и пошел выполняться мой take из другого файбера, который взял задачу, обновил ее и даже записал инфу о take в _queue_taken_2, а уже после пришел в on_task_change файбер с delayed и удалил запись о взятой задаче. Как итог - задаче не может быть выполнен ack/release, так как не проходим проверку в функции check_task_is_taken.
Также исправил форматирование в логе на %s, так как при большом числе, которое не вмещается в number, будет ошибка.