Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ class TKqpDataExecuter: public TKqpExecuterBase<TKqpDataExecuter, EExecType::Dat
ReadOnlyTx = IsReadOnlyTx();
}

void CheckExecutionComplete() {
bool CheckExecutionComplete() {
ui32 notFinished = 0;
for (const auto& x : ShardStates) {
if (x.second.State != TShardState::EState::Finished) {
Expand All @@ -157,7 +157,7 @@ class TKqpDataExecuter: public TKqpExecuterBase<TKqpDataExecuter, EExecType::Dat
}
}
if (notFinished == 0 && TBase::CheckExecutionComplete()) {
return;
return true;
}

if (IsDebugLogEnabled()) {
Expand All @@ -176,6 +176,8 @@ class TKqpDataExecuter: public TKqpExecuterBase<TKqpDataExecuter, EExecType::Dat
}
LOG_D(sb);
}

return false;
}

bool ForceAcquireSnapshot() const {
Expand Down Expand Up @@ -2145,6 +2147,10 @@ class TKqpDataExecuter: public TKqpExecuterBase<TKqpDataExecuter, EExecType::Dat

ExecuteTasks();

if (CheckExecutionComplete()) {
return;
}

ExecuterStateSpan = NWilson::TSpan(TWilsonKqp::DataExecuterRunTasks, ExecuterSpan.GetTraceId(), "RunTasks", NWilson::EFlags::AUTO_END);
if (ImmediateTx) {
LOG_D("ActorState: " << CurrentStateFuncName() << ", immediate tx, become ExecuteState");
Expand Down Expand Up @@ -2255,7 +2261,7 @@ class TKqpDataExecuter: public TKqpExecuterBase<TKqpDataExecuter, EExecType::Dat
// Volatile transactions must always use generic readsets
VolatileTx ||
// Transactions with topics must always use generic readsets
!topicTxs.empty() ||
!topicTxs.empty() ||
// HTAP transactions always use generic readsets
!evWriteTxs.empty());

Expand Down Expand Up @@ -2588,8 +2594,6 @@ class TKqpDataExecuter: public TKqpExecuterBase<TKqpDataExecuter, EExecType::Dat
}
}
PropagateChannelsUpdates(updates);

CheckExecutionComplete();
}

void ExecuteTopicTabletTransactions(TTopicTabletTxs& topicTxs) {
Expand Down
Loading