Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 2 additions & 18 deletions ydb/core/kqp/common/kqp_tx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ bool HasOltpTableWriteInTx(const NKqpProto::TKqpPhyQuery& physicalQuery) {
return false;
}

bool HasUncommittedChangesRead(THashSet<NKikimr::TTableId>& modifiedTables, const NKqpProto::TKqpPhyQuery& physicalQuery, const bool canUseVolatileTx, const bool commit) {
bool HasUncommittedChangesRead(THashSet<NKikimr::TTableId>& modifiedTables, const NKqpProto::TKqpPhyQuery& physicalQuery, const bool commit) {
auto getTable = [](const NKqpProto::TKqpPhyTableId& table) {
return NKikimr::TTableId(table.GetOwnerId(), table.GetTableId());
};
Expand Down Expand Up @@ -402,7 +402,7 @@ bool HasUncommittedChangesRead(THashSet<NKikimr::TTableId>& modifiedTables, cons
NKikimrKqp::TKqpTableSinkSettings settings;
YQL_ENSURE(sink.GetInternalSink().GetSettings().UnpackTo(&settings), "Failed to unpack settings");
modifiedTables.insert(getTable(settings.GetTable()));
if (settings.GetType() == NKikimrKqp::TKqpTableSinkSettings::MODE_INSERT && (!commit || !canUseVolatileTx)) {
if (settings.GetType() == NKikimrKqp::TKqpTableSinkSettings::MODE_INSERT && !commit) {
// INSERT with sink should be executed immediately, because it returns an error in case of duplicate rows.
return true;
}
Expand All @@ -415,21 +415,5 @@ bool HasUncommittedChangesRead(THashSet<NKikimr::TTableId>& modifiedTables, cons
return false;
}

bool HasSinkInsert(const TKqpPhyTxHolder::TConstPtr& tx) {
for (const auto &stage : tx->GetStages()) {
for (const auto& sink : stage.GetSinks()) {
if (sink.GetTypeCase() == NKqpProto::TKqpSink::kInternalSink) {
YQL_ENSURE(sink.GetInternalSink().GetSettings().Is<NKikimrKqp::TKqpTableSinkSettings>());
NKikimrKqp::TKqpTableSinkSettings settings;
YQL_ENSURE(sink.GetInternalSink().GetSettings().UnpackTo(&settings), "Failed to unpack settings");
if (settings.GetType() == NKikimrKqp::TKqpTableSinkSettings::MODE_INSERT) {
return true;
}
}
}
}
return false;
}

} // namespace NKqp
} // namespace NKikimr
12 changes: 3 additions & 9 deletions ydb/core/kqp/common/kqp_tx.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,11 +165,7 @@ class TShardIdToTableInfo {
};
using TShardIdToTableInfoPtr = std::shared_ptr<TShardIdToTableInfo>;

bool HasUncommittedChangesRead(
THashSet<NKikimr::TTableId>& modifiedTables,
const NKqpProto::TKqpPhyQuery& physicalQuery,
const bool canUseVolatileTx,
const bool commit);
bool HasUncommittedChangesRead(THashSet<NKikimr::TTableId>& modifiedTables, const NKqpProto::TKqpPhyQuery& physicalQuery, const bool commit);

class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase {
public:
Expand Down Expand Up @@ -325,9 +321,9 @@ class TKqpTransactionContext : public NYql::TKikimrTransactionContextBase {
return true;
}

void ApplyPhysicalQuery(const NKqpProto::TKqpPhyQuery& phyQuery, const bool canUseVolatileTx, const bool commit) {
void ApplyPhysicalQuery(const NKqpProto::TKqpPhyQuery& phyQuery, const bool commit) {
NeedUncommittedChangesFlush = (DeferredEffects.Size() > kMaxDeferredEffects)
|| HasUncommittedChangesRead(ModifiedTablesSinceLastFlush, phyQuery, canUseVolatileTx, commit);
|| HasUncommittedChangesRead(ModifiedTablesSinceLastFlush, phyQuery, commit);
if (NeedUncommittedChangesFlush) {
ModifiedTablesSinceLastFlush.clear();
}
Expand Down Expand Up @@ -532,6 +528,4 @@ bool HasOlapTableWriteInTx(const NKqpProto::TKqpPhyQuery& physicalQuery);
bool HasOltpTableReadInTx(const NKqpProto::TKqpPhyQuery& physicalQuery);
bool HasOltpTableWriteInTx(const NKqpProto::TKqpPhyQuery& physicalQuery);

bool HasSinkInsert(const TKqpPhyTxHolder::TConstPtr& tx);

} // namespace NKikimr::NKqp
8 changes: 7 additions & 1 deletion ydb/core/kqp/host/kqp_host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1635,7 +1635,13 @@ class TKqpHost : public IKqpHost {

YQL_ENSURE(ExprCtxStorage);

auto prepareData = PrepareRewrite(compileResult.QueryExpr, *ExprCtxStorage, *TypesCtx, SessionCtx, Cluster);
auto prepareData = PrepareRewrite(
compileResult.QueryExpr,
*ExprCtxStorage,
*TypesCtx,
SessionCtx,
*FuncRegistry,
Cluster);

return MakeIntrusive<TAsyncSplitQueryResult>(
compileResult.QueryExpr,
Expand Down
5 changes: 4 additions & 1 deletion ydb/core/kqp/host/kqp_statement_rewrite.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ namespace {
NYql::TExprContext& exprCtx,
NYql::TTypeAnnotationContext& typeCtx,
const TIntrusivePtr<NYql::TKikimrSessionContext>& sessionCtx,
const NMiniKQL::IFunctionRegistry& funcRegistry,
const TString& cluster) {
NYql::NNodes::TExprBase expr(root);
auto maybeWrite = expr.Maybe<NYql::NNodes::TCoWrite>();
Expand Down Expand Up @@ -135,6 +136,7 @@ namespace {

auto typeTransformer = NYql::TTransformationPipeline(&typeCtx)
.AddServiceTransformers()
.AddExpressionEvaluation(funcRegistry)
.AddPreTypeAnnotation()
.AddIOAnnotation()
.AddTypeAnnotationTransformer(CreateKqpTypeAnnotationTransformer(cluster, sessionCtx->TablesPtr(), typeCtx, sessionCtx->ConfigPtr()))
Expand Down Expand Up @@ -428,6 +430,7 @@ TPrepareRewriteInfo PrepareRewrite(
NYql::TExprContext& exprCtx,
NYql::TTypeAnnotationContext& typeCtx,
const TIntrusivePtr<NYql::TKikimrSessionContext>& sessionCtx,
const NMiniKQL::IFunctionRegistry& funcRegistry,
const TString& cluster) {
// CREATE TABLE AS statement can be used only with perstatement execution.
// Thus we assume that there is only one such statement. (it was checked in CheckRewrite)
Expand All @@ -440,7 +443,7 @@ TPrepareRewriteInfo PrepareRewrite(
});
YQL_ENSURE(createTableAsNode);

return PrepareCreateTableAs(createTableAsNode, exprCtx, typeCtx, sessionCtx, cluster);
return PrepareCreateTableAs(createTableAsNode, exprCtx, typeCtx, sessionCtx, funcRegistry, cluster);
}

TVector<NYql::TExprNode::TPtr> RewriteExpression(
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/host/kqp_statement_rewrite.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ TPrepareRewriteInfo PrepareRewrite(
NYql::TExprContext& exprCtx,
NYql::TTypeAnnotationContext& typeCtx,
const TIntrusivePtr<NYql::TKikimrSessionContext>& sessionCtx,
const NMiniKQL::IFunctionRegistry& funcRegistry,
const TString& cluster);

TVector<NYql::TExprNode::TPtr> RewriteExpression(
Expand Down
33 changes: 26 additions & 7 deletions ydb/core/kqp/runtime/kqp_write_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1961,19 +1961,20 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
return Process();
}

bool Prepare(const ui64 txId, NWilson::TTraceId traceId) {
bool Prepare(std::optional<NWilson::TTraceId> traceId) {
UpdateTracingState("Commit", std::move(traceId));
OperationStartTime = TInstant::Now();

CA_LOG_D("Start prepare for distributed commit");
YQL_ENSURE(State == EState::WRITING);
YQL_ENSURE(!NeedToFlushBeforeCommit);
State = EState::PREPARING;
for (auto& [_, queue] : DataQueues) {
YQL_ENSURE(queue.empty());
}
TxId = txId;
YQL_ENSURE(TxId);
for (auto& [_, info] : WriteInfos) {
info.WriteTableActor->SetPrepare(txId);
info.WriteTableActor->SetPrepare(*TxId);
}
Close();
if (!Process()) {
Expand Down Expand Up @@ -2424,8 +2425,14 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
TxManager->StartExecute();
ImmediateCommit(std::move(ev->TraceId));
} else {
TxManager->StartPrepare();
Prepare(ev->Get()->TxId, std::move(ev->TraceId));
AFL_ENSURE(ev->Get()->TxId);
TxId = ev->Get()->TxId;
if (NeedToFlushBeforeCommit) {
Flush(std::move(ev->TraceId));
} else {
TxManager->StartPrepare();
Prepare(std::move(ev->TraceId));
}
}
}

Expand Down Expand Up @@ -2805,6 +2812,14 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
UpdateTracingState("Write", BufferWriteActorSpan.GetTraceId());
OnOperationFinished(Counters->BufferActorFlushLatencyHistogram);
State = EState::WRITING;
AFL_ENSURE(!TxId || NeedToFlushBeforeCommit); // TxId => NeedToFlushBeforeCommit
NeedToFlushBeforeCommit = false;
if (TxId) {
TxManager->StartPrepare();
Prepare(std::nullopt);
return;
}

Send<ESendingType::Tail>(ExecuterActorId, new TEvKqpBuffer::TEvResult{
BuildStats()
});
Expand Down Expand Up @@ -2846,8 +2861,11 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub
ReplyErrorAndDieImpl(statusCode, std::move(issues));
}

void UpdateTracingState(const char* name, NWilson::TTraceId traceId) {
BufferWriteActorStateSpan = NWilson::TSpan(TWilsonKqp::BufferWriteActorState, std::move(traceId),
void UpdateTracingState(const char* name, std::optional<NWilson::TTraceId> traceId) {
if (!traceId) {
return;
}
BufferWriteActorStateSpan = NWilson::TSpan(TWilsonKqp::BufferWriteActorState, std::move(*traceId),
name, NWilson::EFlags::AUTO_END);
if (BufferWriteActorStateSpan.GetTraceId() != BufferWriteActorSpan.GetTraceId()) {
BufferWriteActorStateSpan.Link(BufferWriteActorSpan.GetTraceId());
Expand Down Expand Up @@ -2929,6 +2947,7 @@ class TKqpBufferWriteActor :public TActorBootstrapped<TKqpBufferWriteActor>, pub

EState State;
bool HasError = false;
bool NeedToFlushBeforeCommit = false;
THashMap<TPathId, std::queue<TBufferWriteMessage>> DataQueues;

struct TAckMessage {
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/kqp/session_actor/kqp_query_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ class TKqpQueryState : public TNonCopyable {
return ::NKikimr::NKqp::NeedSnapshot(*TxCtx, config, /*rollback*/ false, Commit, PreparedQuery->GetPhysicalQuery());
}

bool ShouldCommitWithCurrentTx(const TKqpPhyTxHolder::TConstPtr& tx, const bool canUseVolatileTx) {
bool ShouldCommitWithCurrentTx(const TKqpPhyTxHolder::TConstPtr& tx) {
const auto& phyQuery = PreparedQuery->GetPhysicalQuery();
if (!Commit) {
return false;
Expand Down Expand Up @@ -376,7 +376,7 @@ class TKqpQueryState : public TNonCopyable {
}

if (TxCtx->NeedUncommittedChangesFlush || AppData()->FeatureFlags.GetEnableForceImmediateEffectsExecution()) {
if (tx && tx->GetHasEffects() && (!HasSinkInsert(tx) || canUseVolatileTx)) {
if (tx && tx->GetHasEffects()) {
YQL_ENSURE(tx->ResultsSize() == 0);
// commit can be applied to the last transaction with effects
return CurrentTx + 1 == phyQuery.TransactionsSize();
Expand Down
13 changes: 2 additions & 11 deletions ydb/core/kqp/session_actor/kqp_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -950,10 +950,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
}

QueryState->TxCtx->SetTempTables(QueryState->TempTablesState);
QueryState->TxCtx->ApplyPhysicalQuery(
phyQuery,
CanUseVolatileTx(),
QueryState->Commit);
QueryState->TxCtx->ApplyPhysicalQuery(phyQuery, QueryState->Commit);
auto [success, issues] = QueryState->TxCtx->ApplyTableOperations(phyQuery.GetTableOps(), phyQuery.GetTableInfos(),
EKikimrQueryType::Dml);
if (!success) {
Expand Down Expand Up @@ -1195,7 +1192,7 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
ExecutePartitioned(tx);
} else if (QueryState->TxCtx->ShouldExecuteDeferredEffects(tx)) {
ExecuteDeferredEffectsImmediately(tx);
} else if (auto commit = QueryState->ShouldCommitWithCurrentTx(tx, CanUseVolatileTx()); commit || tx) {
} else if (auto commit = QueryState->ShouldCommitWithCurrentTx(tx); commit || tx) {
ExecutePhyTx(tx, commit);
} else {
ReplySuccess();
Expand Down Expand Up @@ -2900,12 +2897,6 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {
}
}

bool CanUseVolatileTx() const {
return AppData()->FeatureFlags.GetEnableDataShardVolatileTransactions()
&& !QueryState->TxCtx->TopicOperations.HasOperations()
&& !QueryState->TxCtx->HasOlapTable;
}

private:
TActorId Owner;
TKqpQueryCachePtr QueryCache;
Expand Down
46 changes: 46 additions & 0 deletions ydb/core/kqp/ut/effects/kqp_effects_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,52 @@ Y_UNIT_TEST_SUITE(KqpEffects) {
UNIT_ASSERT_VALUES_EQUAL(reads[0]["columns"].GetArraySafe().size(), 3);
}

Y_UNIT_TEST_TWIN(EmptyUpdate, UseSink) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableOltpSink(UseSink);
auto settings = TKikimrSettings()
.SetAppConfig(appConfig)
.SetWithSampleTables(false);
TKikimrRunner kikimr(settings);
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

kikimr.GetTestServer().GetRuntime()->SetLogPriority(NKikimrServices::TX_DATASHARD, NActors::NLog::PRI_TRACE);

{
auto schemeResult = session.ExecuteSchemeQuery(R"(
--!syntax_v1
CREATE TABLE T1 (
Key Uint32,
Value Uint32,
Timestamp Timestamp,
PRIMARY KEY (Key)
);
CREATE TABLE T2 (
Key Uint32,
Value Uint32,
PRIMARY KEY (Key)
);
)").ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(schemeResult.GetStatus(), EStatus::SUCCESS, schemeResult.GetIssues().ToString());
}
Cerr << "!!!UPDATE TABLE" << Endl;
{
auto result = session.ExecuteDataQuery(R"(
--!syntax_v1
$data = SELECT 1u AS Key, 1u AS Value;
UPDATE T1 ON SELECT Key, Value FROM $data;
DELETE FROM T2 WHERE Key = 1;
)", TTxControl::BeginTx().CommitTx()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}
Cerr << "!!!DROP TABLE" << Endl;
{
auto schemeResult = session.DropTable("/Root/T1").ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL_C(schemeResult.GetStatus(), EStatus::SUCCESS, schemeResult.GetIssues().ToString());
}
}

Y_UNIT_TEST_TWIN(AlterDuringUpsertTransaction, UseSink) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnableOltpSink(UseSink);
Expand Down
Loading
Loading