diff --git a/ydb/core/kqp/common/kqp_tx.cpp b/ydb/core/kqp/common/kqp_tx.cpp index 66f5d4d14af8..f6022c112c52 100644 --- a/ydb/core/kqp/common/kqp_tx.cpp +++ b/ydb/core/kqp/common/kqp_tx.cpp @@ -335,7 +335,7 @@ bool HasOltpTableWriteInTx(const NKqpProto::TKqpPhyQuery& physicalQuery) { return false; } -bool HasUncommittedChangesRead(THashSet& modifiedTables, const NKqpProto::TKqpPhyQuery& physicalQuery, const bool canUseVolatileTx, const bool commit) { +bool HasUncommittedChangesRead(THashSet& modifiedTables, const NKqpProto::TKqpPhyQuery& physicalQuery, const bool commit) { auto getTable = [](const NKqpProto::TKqpPhyTableId& table) { return NKikimr::TTableId(table.GetOwnerId(), table.GetTableId()); }; @@ -402,7 +402,7 @@ bool HasUncommittedChangesRead(THashSet& modifiedTables, cons NKikimrKqp::TKqpTableSinkSettings settings; YQL_ENSURE(sink.GetInternalSink().GetSettings().UnpackTo(&settings), "Failed to unpack settings"); modifiedTables.insert(getTable(settings.GetTable())); - if (settings.GetType() == NKikimrKqp::TKqpTableSinkSettings::MODE_INSERT && (!commit || !canUseVolatileTx)) { + if (settings.GetType() == NKikimrKqp::TKqpTableSinkSettings::MODE_INSERT && !commit) { // INSERT with sink should be executed immediately, because it returns an error in case of duplicate rows. return true; } @@ -415,21 +415,5 @@ bool HasUncommittedChangesRead(THashSet& modifiedTables, cons return false; } -bool HasSinkInsert(const TKqpPhyTxHolder::TConstPtr& tx) { - for (const auto &stage : tx->GetStages()) { - for (const auto& sink : stage.GetSinks()) { - if (sink.GetTypeCase() == NKqpProto::TKqpSink::kInternalSink) { - YQL_ENSURE(sink.GetInternalSink().GetSettings().Is()); - NKikimrKqp::TKqpTableSinkSettings settings; - YQL_ENSURE(sink.GetInternalSink().GetSettings().UnpackTo(&settings), "Failed to unpack settings"); - if (settings.GetType() == NKikimrKqp::TKqpTableSinkSettings::MODE_INSERT) { - return true; - } - } - } - } - return false; -} - } // namespace NKqp } // namespace NKikimr diff --git a/ydb/core/kqp/common/kqp_tx.h b/ydb/core/kqp/common/kqp_tx.h index 3eb4715b07ef..77bdb4345f8f 100644 --- a/ydb/core/kqp/common/kqp_tx.h +++ b/ydb/core/kqp/common/kqp_tx.h @@ -165,11 +165,7 @@ class TShardIdToTableInfo { }; using TShardIdToTableInfoPtr = std::shared_ptr; -bool HasUncommittedChangesRead( - THashSet& modifiedTables, - const NKqpProto::TKqpPhyQuery& physicalQuery, - const bool canUseVolatileTx, - const bool commit); +bool HasUncommittedChangesRead(THashSet& modifiedTables, const NKqpProto::TKqpPhyQuery& physicalQuery, const bool commit); class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase { public: @@ -325,9 +321,9 @@ class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase { return true; } - void ApplyPhysicalQuery(const NKqpProto::TKqpPhyQuery& phyQuery, const bool canUseVolatileTx, const bool commit) { + void ApplyPhysicalQuery(const NKqpProto::TKqpPhyQuery& phyQuery, const bool commit) { NeedUncommittedChangesFlush = (DeferredEffects.Size() > kMaxDeferredEffects) - || HasUncommittedChangesRead(ModifiedTablesSinceLastFlush, phyQuery, canUseVolatileTx, commit); + || HasUncommittedChangesRead(ModifiedTablesSinceLastFlush, phyQuery, commit); if (NeedUncommittedChangesFlush) { ModifiedTablesSinceLastFlush.clear(); } @@ -532,6 +528,4 @@ bool HasOlapTableWriteInTx(const NKqpProto::TKqpPhyQuery& physicalQuery); bool HasOltpTableReadInTx(const NKqpProto::TKqpPhyQuery& physicalQuery); bool HasOltpTableWriteInTx(const NKqpProto::TKqpPhyQuery& physicalQuery); -bool HasSinkInsert(const TKqpPhyTxHolder::TConstPtr& tx); - } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/host/kqp_host.cpp b/ydb/core/kqp/host/kqp_host.cpp index d1241a1ac26d..031c1a3a2f84 100644 --- a/ydb/core/kqp/host/kqp_host.cpp +++ b/ydb/core/kqp/host/kqp_host.cpp @@ -1635,7 +1635,13 @@ class TKqpHost : public IKqpHost { YQL_ENSURE(ExprCtxStorage); - auto prepareData = PrepareRewrite(compileResult.QueryExpr, *ExprCtxStorage, *TypesCtx, SessionCtx, Cluster); + auto prepareData = PrepareRewrite( + compileResult.QueryExpr, + *ExprCtxStorage, + *TypesCtx, + SessionCtx, + *FuncRegistry, + Cluster); return MakeIntrusive( compileResult.QueryExpr, diff --git a/ydb/core/kqp/host/kqp_statement_rewrite.cpp b/ydb/core/kqp/host/kqp_statement_rewrite.cpp index 8d1a7ac85ba0..5e9627adb491 100644 --- a/ydb/core/kqp/host/kqp_statement_rewrite.cpp +++ b/ydb/core/kqp/host/kqp_statement_rewrite.cpp @@ -98,6 +98,7 @@ namespace { NYql::TExprContext& exprCtx, NYql::TTypeAnnotationContext& typeCtx, const TIntrusivePtr& sessionCtx, + const NMiniKQL::IFunctionRegistry& funcRegistry, const TString& cluster) { NYql::NNodes::TExprBase expr(root); auto maybeWrite = expr.Maybe(); @@ -135,6 +136,7 @@ namespace { auto typeTransformer = NYql::TTransformationPipeline(&typeCtx) .AddServiceTransformers() + .AddExpressionEvaluation(funcRegistry) .AddPreTypeAnnotation() .AddIOAnnotation() .AddTypeAnnotationTransformer(CreateKqpTypeAnnotationTransformer(cluster, sessionCtx->TablesPtr(), typeCtx, sessionCtx->ConfigPtr())) @@ -428,6 +430,7 @@ TPrepareRewriteInfo PrepareRewrite( NYql::TExprContext& exprCtx, NYql::TTypeAnnotationContext& typeCtx, const TIntrusivePtr& sessionCtx, + const NMiniKQL::IFunctionRegistry& funcRegistry, const TString& cluster) { // CREATE TABLE AS statement can be used only with perstatement execution. // Thus we assume that there is only one such statement. (it was checked in CheckRewrite) @@ -440,7 +443,7 @@ TPrepareRewriteInfo PrepareRewrite( }); YQL_ENSURE(createTableAsNode); - return PrepareCreateTableAs(createTableAsNode, exprCtx, typeCtx, sessionCtx, cluster); + return PrepareCreateTableAs(createTableAsNode, exprCtx, typeCtx, sessionCtx, funcRegistry, cluster); } TVector RewriteExpression( diff --git a/ydb/core/kqp/host/kqp_statement_rewrite.h b/ydb/core/kqp/host/kqp_statement_rewrite.h index 08602016e72e..0e8835cec806 100644 --- a/ydb/core/kqp/host/kqp_statement_rewrite.h +++ b/ydb/core/kqp/host/kqp_statement_rewrite.h @@ -25,6 +25,7 @@ TPrepareRewriteInfo PrepareRewrite( NYql::TExprContext& exprCtx, NYql::TTypeAnnotationContext& typeCtx, const TIntrusivePtr& sessionCtx, + const NMiniKQL::IFunctionRegistry& funcRegistry, const TString& cluster); TVector RewriteExpression( diff --git a/ydb/core/kqp/runtime/kqp_write_actor.cpp b/ydb/core/kqp/runtime/kqp_write_actor.cpp index bd182be428b9..4aa31104af2d 100644 --- a/ydb/core/kqp/runtime/kqp_write_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_write_actor.cpp @@ -1961,19 +1961,20 @@ class TKqpBufferWriteActor :public TActorBootstrapped, pub return Process(); } - bool Prepare(const ui64 txId, NWilson::TTraceId traceId) { + bool Prepare(std::optional traceId) { UpdateTracingState("Commit", std::move(traceId)); OperationStartTime = TInstant::Now(); CA_LOG_D("Start prepare for distributed commit"); YQL_ENSURE(State == EState::WRITING); + YQL_ENSURE(!NeedToFlushBeforeCommit); State = EState::PREPARING; for (auto& [_, queue] : DataQueues) { YQL_ENSURE(queue.empty()); } - TxId = txId; + YQL_ENSURE(TxId); for (auto& [_, info] : WriteInfos) { - info.WriteTableActor->SetPrepare(txId); + info.WriteTableActor->SetPrepare(*TxId); } Close(); if (!Process()) { @@ -2424,8 +2425,14 @@ class TKqpBufferWriteActor :public TActorBootstrapped, pub TxManager->StartExecute(); ImmediateCommit(std::move(ev->TraceId)); } else { - TxManager->StartPrepare(); - Prepare(ev->Get()->TxId, std::move(ev->TraceId)); + AFL_ENSURE(ev->Get()->TxId); + TxId = ev->Get()->TxId; + if (NeedToFlushBeforeCommit) { + Flush(std::move(ev->TraceId)); + } else { + TxManager->StartPrepare(); + Prepare(std::move(ev->TraceId)); + } } } @@ -2805,6 +2812,14 @@ class TKqpBufferWriteActor :public TActorBootstrapped, pub UpdateTracingState("Write", BufferWriteActorSpan.GetTraceId()); OnOperationFinished(Counters->BufferActorFlushLatencyHistogram); State = EState::WRITING; + AFL_ENSURE(!TxId || NeedToFlushBeforeCommit); // TxId => NeedToFlushBeforeCommit + NeedToFlushBeforeCommit = false; + if (TxId) { + TxManager->StartPrepare(); + Prepare(std::nullopt); + return; + } + Send(ExecuterActorId, new TEvKqpBuffer::TEvResult{ BuildStats() }); @@ -2846,8 +2861,11 @@ class TKqpBufferWriteActor :public TActorBootstrapped, pub ReplyErrorAndDieImpl(statusCode, std::move(issues)); } - void UpdateTracingState(const char* name, NWilson::TTraceId traceId) { - BufferWriteActorStateSpan = NWilson::TSpan(TWilsonKqp::BufferWriteActorState, std::move(traceId), + void UpdateTracingState(const char* name, std::optional traceId) { + if (!traceId) { + return; + } + BufferWriteActorStateSpan = NWilson::TSpan(TWilsonKqp::BufferWriteActorState, std::move(*traceId), name, NWilson::EFlags::AUTO_END); if (BufferWriteActorStateSpan.GetTraceId() != BufferWriteActorSpan.GetTraceId()) { BufferWriteActorStateSpan.Link(BufferWriteActorSpan.GetTraceId()); @@ -2929,6 +2947,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped, pub EState State; bool HasError = false; + bool NeedToFlushBeforeCommit = false; THashMap> DataQueues; struct TAckMessage { diff --git a/ydb/core/kqp/session_actor/kqp_query_state.h b/ydb/core/kqp/session_actor/kqp_query_state.h index a4ac96c8d778..10c7b55952f4 100644 --- a/ydb/core/kqp/session_actor/kqp_query_state.h +++ b/ydb/core/kqp/session_actor/kqp_query_state.h @@ -348,7 +348,7 @@ class TKqpQueryState : public TNonCopyable { return ::NKikimr::NKqp::NeedSnapshot(*TxCtx, config, /*rollback*/ false, Commit, PreparedQuery->GetPhysicalQuery()); } - bool ShouldCommitWithCurrentTx(const TKqpPhyTxHolder::TConstPtr& tx, const bool canUseVolatileTx) { + bool ShouldCommitWithCurrentTx(const TKqpPhyTxHolder::TConstPtr& tx) { const auto& phyQuery = PreparedQuery->GetPhysicalQuery(); if (!Commit) { return false; @@ -376,7 +376,7 @@ class TKqpQueryState : public TNonCopyable { } if (TxCtx->NeedUncommittedChangesFlush || AppData()->FeatureFlags.GetEnableForceImmediateEffectsExecution()) { - if (tx && tx->GetHasEffects() && (!HasSinkInsert(tx) || canUseVolatileTx)) { + if (tx && tx->GetHasEffects()) { YQL_ENSURE(tx->ResultsSize() == 0); // commit can be applied to the last transaction with effects return CurrentTx + 1 == phyQuery.TransactionsSize(); diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index e8da31adbca6..01ac5d9eb41a 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -950,10 +950,7 @@ class TKqpSessionActor : public TActorBootstrapped { } QueryState->TxCtx->SetTempTables(QueryState->TempTablesState); - QueryState->TxCtx->ApplyPhysicalQuery( - phyQuery, - CanUseVolatileTx(), - QueryState->Commit); + QueryState->TxCtx->ApplyPhysicalQuery(phyQuery, QueryState->Commit); auto [success, issues] = QueryState->TxCtx->ApplyTableOperations(phyQuery.GetTableOps(), phyQuery.GetTableInfos(), EKikimrQueryType::Dml); if (!success) { @@ -1195,7 +1192,7 @@ class TKqpSessionActor : public TActorBootstrapped { ExecutePartitioned(tx); } else if (QueryState->TxCtx->ShouldExecuteDeferredEffects(tx)) { ExecuteDeferredEffectsImmediately(tx); - } else if (auto commit = QueryState->ShouldCommitWithCurrentTx(tx, CanUseVolatileTx()); commit || tx) { + } else if (auto commit = QueryState->ShouldCommitWithCurrentTx(tx); commit || tx) { ExecutePhyTx(tx, commit); } else { ReplySuccess(); @@ -2900,12 +2897,6 @@ class TKqpSessionActor : public TActorBootstrapped { } } - bool CanUseVolatileTx() const { - return AppData()->FeatureFlags.GetEnableDataShardVolatileTransactions() - && !QueryState->TxCtx->TopicOperations.HasOperations() - && !QueryState->TxCtx->HasOlapTable; - } - private: TActorId Owner; TKqpQueryCachePtr QueryCache; diff --git a/ydb/core/kqp/ut/effects/kqp_effects_ut.cpp b/ydb/core/kqp/ut/effects/kqp_effects_ut.cpp index 04482c6539b3..c709eefc4820 100644 --- a/ydb/core/kqp/ut/effects/kqp_effects_ut.cpp +++ b/ydb/core/kqp/ut/effects/kqp_effects_ut.cpp @@ -525,6 +525,52 @@ Y_UNIT_TEST_SUITE(KqpEffects) { UNIT_ASSERT_VALUES_EQUAL(reads[0]["columns"].GetArraySafe().size(), 3); } + Y_UNIT_TEST_TWIN(EmptyUpdate, UseSink) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableOltpSink(UseSink); + auto settings = TKikimrSettings() + .SetAppConfig(appConfig) + .SetWithSampleTables(false); + TKikimrRunner kikimr(settings); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::TX_DATASHARD, NActors::NLog::PRI_TRACE); + + { + auto schemeResult = session.ExecuteSchemeQuery(R"( + --!syntax_v1 + CREATE TABLE T1 ( + Key Uint32, + Value Uint32, + Timestamp Timestamp, + PRIMARY KEY (Key) + ); + CREATE TABLE T2 ( + Key Uint32, + Value Uint32, + PRIMARY KEY (Key) + ); + )").ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(schemeResult.GetStatus(), EStatus::SUCCESS, schemeResult.GetIssues().ToString()); + } + Cerr << "!!!UPDATE TABLE" << Endl; + { + auto result = session.ExecuteDataQuery(R"( + --!syntax_v1 + $data = SELECT 1u AS Key, 1u AS Value; + UPDATE T1 ON SELECT Key, Value FROM $data; + DELETE FROM T2 WHERE Key = 1; + )", TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + } + Cerr << "!!!DROP TABLE" << Endl; + { + auto schemeResult = session.DropTable("/Root/T1").ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(schemeResult.GetStatus(), EStatus::SUCCESS, schemeResult.GetIssues().ToString()); + } + } + Y_UNIT_TEST_TWIN(AlterDuringUpsertTransaction, UseSink) { NKikimrConfig::TAppConfig appConfig; appConfig.MutableTableServiceConfig()->SetEnableOltpSink(UseSink); diff --git a/ydb/core/kqp/ut/query/kqp_query_ut.cpp b/ydb/core/kqp/ut/query/kqp_query_ut.cpp index a45047ad5817..71bc13a08825 100644 --- a/ydb/core/kqp/ut/query/kqp_query_ut.cpp +++ b/ydb/core/kqp/ut/query/kqp_query_ut.cpp @@ -2674,7 +2674,124 @@ Y_UNIT_TEST_SUITE(KqpQuery) { UPDATE test_table SET data = "a" )", NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync(); UNIT_ASSERT_VALUES_EQUAL_C(hangingResult.GetStatus(), EStatus::SUCCESS, hangingResult.GetIssues().ToString()); - } + } + + Y_UNIT_TEST(CreateAsSelectView) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnableOlapSink(true); + appConfig.MutableTableServiceConfig()->SetEnableCreateTableAs(true); + appConfig.MutableTableServiceConfig()->SetEnablePerStatementQueryExecution(true); + auto settings = TKikimrSettings() + .SetAppConfig(appConfig) + .SetWithSampleTables(false) + .SetEnableTempTables(true); + TKikimrRunner kikimr(settings); + + Tests::NCommon::TLoggerInit(kikimr).SetComponents({ NKikimrServices::TX_COLUMNSHARD }, "CS").Initialize(); + + auto client = kikimr.GetQueryClient(); + + { + auto result = client.ExecuteQuery( R"( + CREATE TABLE `l_source` ( + id Uint64, + num Uint64, + unused String, + PRIMARY KEY (id) + ); + + CREATE TABLE `r_source` ( + id Uint64, + id2 Uint64, + unused String, + PRIMARY KEY (id) + ); + )", NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + } + + { + auto result = client.ExecuteQuery( R"( + CREATE VIEW `l` + with (security_invoker = TRUE) + AS ( + SELECT + id, + num + FROM + `l_source` + ); + + CREATE VIEW `r` + with (security_invoker = TRUE) + AS ( + SELECT + id, + id2 + FROM + `r_source` + ); + )", NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync(); + UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString()); + } + + { + auto prepareResult = client.ExecuteQuery(R"( + INSERT INTO `/Root/l_source` (id, num) VALUES + (1u, 1u), (100u, 100u), (10u, 10u); + INSERT INTO `/Root/r_source` (id, id2) VALUES + (1u, 1u), (100u, 100u), (10u, 10u); + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_C(prepareResult.IsSuccess(), prepareResult.GetIssues().ToString()); + } + + { + auto prepareResult = client.ExecuteQuery(R"( + CREATE TABLE `table1` + (PRIMARY KEY (id)) + AS ( + SELECT + id, num + FROM `l` + ) + )", NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync(); + UNIT_ASSERT_C(prepareResult.IsSuccess(), prepareResult.GetIssues().ToString()); + } + + { + auto it = client.StreamExecuteQuery(R"( + SELECT id, num FROM `/Root/table1` ORDER BY id; + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString()); + TString output = StreamResultToYson(it); + CompareYson(output, R"([[[1u];[1u]];[[10u];[10u]];[[100u];[100u]]])"); + } + + { + auto prepareResult = client.ExecuteQuery(R"( + CREATE TABLE `table2` + (PRIMARY KEY (id2)) + AS ( + SELECT + r.id2 AS id2, + sum(l.num) AS num + FROM `l` AS l + LEFT JOIN `r` AS r ON l.id = r.id + GROUP BY r.id2 + ) + )", NYdb::NQuery::TTxControl::NoTx()).ExtractValueSync(); + UNIT_ASSERT_C(prepareResult.IsSuccess(), prepareResult.GetIssues().ToString()); + } + + { + auto it = client.StreamExecuteQuery(R"( + SELECT id2, num FROM `/Root/table2` ORDER BY id2; + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(it.GetStatus(), EStatus::SUCCESS, it.GetIssues().ToString()); + TString output = StreamResultToYson(it); + CompareYson(output, R"([[[1u];[1u]];[[10u];[10u]];[[100u];[100u]]])"); + } + } } } // namespace NKqp diff --git a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp index 8c667b4266b7..87d924bf5b29 100644 --- a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp +++ b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp @@ -348,7 +348,7 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio // Note: any transaction (e.g. immediate or non-volatile) may decide to commit as volatile due to dependencies // Such transactions would have no participants and become immediately committed auto commitTxIds = dataTx->GetVolatileCommitTxIds(); - if (commitTxIds) { + if (commitTxIds || isArbiter) { TVector participants(awaitingDecisions.begin(), awaitingDecisions.end()); DataShard.GetVolatileTxManager().PersistAddVolatileTx( txId, @@ -360,6 +360,8 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio dataTx->GetVolatileCommitOrdered(), isArbiter, txc); + } else { + awaitingDecisions.clear(); } if (dataTx->GetPerformedUserReads()) { diff --git a/ydb/core/tx/datashard/execute_write_unit.cpp b/ydb/core/tx/datashard/execute_write_unit.cpp index a6dea52258ce..0750a1467afc 100644 --- a/ydb/core/tx/datashard/execute_write_unit.cpp +++ b/ydb/core/tx/datashard/execute_write_unit.cpp @@ -433,7 +433,7 @@ class TExecuteWriteUnit : public TExecutionUnit { // Note: any transaction (e.g. immediate or non-volatile) may decide to commit as volatile due to dependencies // Such transactions would have no participants and become immediately committed auto commitTxIds = userDb.GetVolatileCommitTxIds(); - if (commitTxIds) { + if (commitTxIds || isArbiter) { TVector participants(awaitingDecisions.begin(), awaitingDecisions.end()); DataShard.GetVolatileTxManager().PersistAddVolatileTx( userDb.GetVolatileTxId(), @@ -446,6 +446,8 @@ class TExecuteWriteUnit : public TExecutionUnit { isArbiter, txc ); + } else { + awaitingDecisions.clear(); } if (userDb.GetPerformedUserReads()) { diff --git a/ydb/core/tx/datashard/volatile_tx.cpp b/ydb/core/tx/datashard/volatile_tx.cpp index 12a2cf887ca7..851c044fee8e 100644 --- a/ydb/core/tx/datashard/volatile_tx.cpp +++ b/ydb/core/tx/datashard/volatile_tx.cpp @@ -519,9 +519,6 @@ namespace NKikimr::NDataShard { { using Schema = TDataShard::Schema; - Y_VERIFY_S(!commitTxIds.empty(), - "Unexpected volatile txId# " << txId << " @" << version << " without commits"); - auto res = VolatileTxs.insert( std::make_pair(txId, std::make_unique())); Y_VERIFY_S(res.second, "Cannot add volatile txId# " << txId << " @" << version