From 81be0df6ad9b76623a2ae6b08510670775318773 Mon Sep 17 00:00:00 2001 From: Nikolay Shumkov Date: Tue, 16 Jan 2024 20:58:51 +0300 Subject: [PATCH 01/10] Initial commit --- .../compile_service/kqp_compile_service.cpp | 115 +++++++++++------- .../kqp/query_data/kqp_prepared_query.cpp | 73 +++++++++++ ydb/core/kqp/query_data/kqp_prepared_query.h | 6 + 3 files changed, 147 insertions(+), 47 deletions(-) diff --git a/ydb/core/kqp/compile_service/kqp_compile_service.cpp b/ydb/core/kqp/compile_service/kqp_compile_service.cpp index 6d6414fca96c..335310a30496 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_service.cpp +++ b/ydb/core/kqp/compile_service/kqp_compile_service.cpp @@ -566,24 +566,26 @@ class TKqpCompileService : public TActorBootstrapped { if (request.Uid) { Counters->ReportCompileRequestGet(dbCounters); - auto compileResult = QueryCache.FindByUid(*request.Uid, request.KeepInCache); - if (compileResult) { - Y_ENSURE(compileResult->Query); - if (compileResult->Query->UserSid == userSid) { - Counters->ReportQueryCacheHit(dbCounters, true); - - LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Served query from cache by uid" - << ", sender: " << ev->Sender - << ", queryUid: " << *request.Uid); - - ReplyFromCache(ev->Sender, compileResult, ctx, ev->Cookie, std::move(ev->Get()->Orbit), std::move(compileServiceSpan)); - return; - } else { - LOG_NOTICE_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Non-matching user sid for query" - << ", sender: " << ev->Sender - << ", queryUid: " << *request.Uid - << ", expected sid: " << compileResult->Query->UserSid - << ", actual sid: " << userSid); + if (!request.TempTablesState || request.TempTablesState->TempTables.empty()) { + auto compileResult = QueryCache.FindByUid(*request.Uid, request.KeepInCache); + if (compileResult) { + Y_ENSURE(compileResult->Query); + if (compileResult->Query->UserSid == userSid) { + Counters->ReportQueryCacheHit(dbCounters, true); + + LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Served query from cache by uid" + << ", sender: " << ev->Sender + << ", queryUid: " << *request.Uid); + + ReplyFromCache(ev->Sender, compileResult, ctx, ev->Cookie, std::move(ev->Get()->Orbit), std::move(compileServiceSpan)); + return; + } else { + LOG_NOTICE_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Non-matching user sid for query" + << ", sender: " << ev->Sender + << ", queryUid: " << *request.Uid + << ", expected sid: " << compileResult->Query->UserSid + << ", actual sid: " << userSid); + } } } @@ -609,16 +611,19 @@ class TKqpCompileService : public TActorBootstrapped { Y_ENSURE(query.UserSid == userSid); } - auto compileResult = QueryCache.FindByQuery(query, request.KeepInCache); - if (compileResult) { - Counters->ReportQueryCacheHit(dbCounters, true); - LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Served query from cache from query text" - << ", sender: " << ev->Sender - << ", queryUid: " << compileResult->Uid); + if (!request.TempTablesState || request.TempTablesState->TempTables.empty()) { + auto compileResult = QueryCache.FindByQuery(query, request.KeepInCache); + if (compileResult) { + Counters->ReportQueryCacheHit(dbCounters, true); - ReplyFromCache(ev->Sender, compileResult, ctx, ev->Cookie, std::move(ev->Get()->Orbit), std::move(compileServiceSpan)); - return; + LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Served query from cache from query text" + << ", sender: " << ev->Sender + << ", queryUid: " << compileResult->Uid); + + ReplyFromCache(ev->Sender, compileResult, ctx, ev->Cookie, std::move(ev->Get()->Orbit), std::move(compileServiceSpan)); + return; + } } CollectDiagnostics = request.CollectDiagnostics; @@ -672,7 +677,11 @@ class TKqpCompileService : public TActorBootstrapped { auto dbCounters = request.DbCounters; Counters->ReportRecompileRequestGet(dbCounters); - auto compileResult = QueryCache.FindByUid(request.Uid, false); + TKqpCompileResult::TConstPtr compileResult = nullptr; + if (!request.TempTablesState || request.TempTablesState->TempTables.empty()) { + compileResult = QueryCache.FindByUid(request.Uid, false); + } + if (compileResult || request.Query) { Counters->ReportCompileRequestCompile(dbCounters); @@ -736,18 +745,26 @@ class TKqpCompileService : public TActorBootstrapped { bool keepInCache = compileRequest.KeepInCache && compileResult->AllowCache; + bool hasTempTables = compileRequest.TempTablesState + && (!compileRequest.TempTablesState->TempTables.empty()); + if (compileResult->PreparedQuery) { + hasTempTables = compileResult->PreparedQuery->HasTempTables(compileRequest.TempTablesState); + } + try { if (compileResult->Status == Ydb::StatusIds::SUCCESS) { - if (QueryCache.FindByUid(compileResult->Uid, false)) { - QueryCache.Replace(compileResult); - } else if (keepInCache) { - if (QueryCache.Insert(compileResult, TableServiceConfig.GetEnableAstCache())) { - Counters->CompileQueryCacheEvicted->Inc(); - } - if (compileResult->Query && compileResult->Query->Settings.IsPrepareQuery) { - if (InsertPreparingQuery(compileResult, compileRequest.KeepInCache)) { + if (!hasTempTables) { + if (QueryCache.FindByUid(compileResult->Uid, false)) { + QueryCache.Replace(compileResult); + } else if (keepInCache) { + if (QueryCache.Insert(compileResult, TableServiceConfig.GetEnableAstCache())) { Counters->CompileQueryCacheEvicted->Inc(); - }; + } + if (compileResult->Query && compileResult->Query->Settings.IsPrepareQuery) { + if (InsertPreparingQuery(compileResult, compileRequest.KeepInCache)) { + Counters->CompileQueryCacheEvicted->Inc(); + }; + } } } @@ -762,8 +779,10 @@ class TKqpCompileService : public TActorBootstrapped { request.Cookie, std::move(request.Orbit), std::move(request.CompileServiceSpan), (CollectDiagnostics ? ev->Get()->ReplayMessageUserView : std::nullopt)); } } else { - if (QueryCache.FindByUid(compileResult->Uid, false)) { - QueryCache.EraseByUid(compileResult->Uid); + if (!hasTempTables) { + if (QueryCache.FindByUid(compileResult->Uid, false)) { + QueryCache.EraseByUid(compileResult->Uid); + } } } @@ -819,18 +838,20 @@ class TKqpCompileService : public TActorBootstrapped { auto& query = ev->Get()->Query; auto compileRequest = RequestsQueue.FinishActiveRequest(query); if (parseResult && parseResult->Ast->IsOk()) { - auto compileResult = QueryCache.FindByAst(query, *parseResult->Ast, compileRequest.KeepInCache); - if (compileResult) { - Counters->ReportQueryCacheHit(compileRequest.DbCounters, true); + if (!compileRequest.TempTablesState || compileRequest.TempTablesState->TempTables.empty()) { + auto compileResult = QueryCache.FindByAst(query, *parseResult->Ast, compileRequest.KeepInCache); + if (compileResult) { + Counters->ReportQueryCacheHit(compileRequest.DbCounters, true); - LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Served query from cache from ast" - << ", sender: " << compileRequest.Sender - << ", queryUid: " << compileResult->Uid); + LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Served query from cache from ast" + << ", sender: " << compileRequest.Sender + << ", queryUid: " << compileResult->Uid); - compileResult->Ast->PgAutoParamValues = std::move(parseResult->Ast->PgAutoParamValues); + compileResult->Ast->PgAutoParamValues = std::move(parseResult->Ast->PgAutoParamValues); - ReplyFromCache(compileRequest.Sender, compileResult, ctx, compileRequest.Cookie, std::move(compileRequest.Orbit), std::move(compileRequest.CompileServiceSpan)); - return; + ReplyFromCache(compileRequest.Sender, compileResult, ctx, compileRequest.Cookie, std::move(compileRequest.Orbit), std::move(compileRequest.CompileServiceSpan)); + return; + } } } Counters->ReportQueryCacheHit(compileRequest.DbCounters, false); diff --git a/ydb/core/kqp/query_data/kqp_prepared_query.cpp b/ydb/core/kqp/query_data/kqp_prepared_query.cpp index 1fe60ca3ec8f..e9913ed476de 100644 --- a/ydb/core/kqp/query_data/kqp_prepared_query.cpp +++ b/ydb/core/kqp/query_data/kqp_prepared_query.cpp @@ -1,5 +1,6 @@ #include "kqp_prepared_query.h" +#include #include #include #include @@ -106,6 +107,49 @@ bool TKqpPhyTxHolder::IsLiteralTx() const { return LiteralTx; } +std::optional, bool>> +TKqpPhyTxHolder::GetSchemeOpTempTablePath() const { + if (GetType() != NKqpProto::TKqpPhyTx::TYPE_SCHEME) { + return std::nullopt; + } + auto& schemeOperation = GetSchemeOperation(); + switch (schemeOperation.GetOperationCase()) { + case NKqpProto::TKqpSchemeOperation::kCreateTable: { + const auto& modifyScheme = schemeOperation.GetCreateTable(); + const NKikimrSchemeOp::TTableDescription* tableDesc = nullptr; + switch (modifyScheme.GetOperationType()) { + case NKikimrSchemeOp::ESchemeOpCreateTable: { + tableDesc = &modifyScheme.GetCreateTable(); + break; + } + case NKikimrSchemeOp::ESchemeOpCreateIndexedTable: { + tableDesc = &modifyScheme.GetCreateIndexedTable().GetTableDescription(); + break; + } + default: + return std::nullopt; + } + if (tableDesc->HasTemporary()) { + if (tableDesc->GetTemporary()) { + return {{{modifyScheme.GetWorkingDir(), tableDesc->GetName()}, + true}}; + } + } + break; + } + case NKqpProto::TKqpSchemeOperation::kDropTable: { + auto modifyScheme = schemeOperation.GetDropTable(); + auto* dropTable = modifyScheme.MutableDrop(); + + return {{{modifyScheme.GetWorkingDir(), dropTable->GetName()}, + false}}; + } + default: + return std::nullopt; + } + return std::nullopt; +} + const NKikimr::NKqp::TStagePredictor& TKqpPhyTxHolder::GetCalculationPredictor(const size_t stageIdx) const { YQL_ENSURE(stageIdx < Predictors.size(), "incorrect stage idx for predictor"); return Predictors[stageIdx]; @@ -226,6 +270,35 @@ void TPreparedQueryHolder::FillTables(const google::protobuf::RepeatedPtrField< } } +bool TPreparedQueryHolder::HasTempTables(TKqpTempTablesState::TConstPtr tempTablesState) const { + auto tempTables = THashSet(); + for (const auto& [path, info] : tempTablesState->TempTables) { + tempTables.insert(path.second + *tempTablesState->SessionId); + } + for (const auto& table: QueryTables) { + if (tempTables.contains(table)) { + return true; + } + } + + for (const auto& tx: Transactions) { + auto optPath = tx->GetSchemeOpTempTablePath(); + if (!optPath) { + continue; + } else { + const auto& [path, isCreate] = *optPath; + if (isCreate) { + return true; + } else { + if (tempTables.contains(JoinPath({path.first, path.second}))) { + return true; + } + } + } + } + return false; +} + const TKqpPhyTxHolder::TConstPtr& TPreparedQueryHolder::GetPhyTx(ui32 txId) const { YQL_ENSURE(txId < Transactions.size()); return Transactions[txId]; diff --git a/ydb/core/kqp/query_data/kqp_prepared_query.h b/ydb/core/kqp/query_data/kqp_prepared_query.h index 719c14594eff..c0f194ba0315 100644 --- a/ydb/core/kqp/query_data/kqp_prepared_query.h +++ b/ydb/core/kqp/query_data/kqp_prepared_query.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include #include @@ -115,6 +116,9 @@ class TKqpPhyTxHolder { const std::shared_ptr& alloc, TIntrusivePtr tableConstInfoById); bool IsLiteralTx() const; + + std::optional, bool>> + GetSchemeOpTempTablePath() const; }; class TLlvmSettings { @@ -187,6 +191,8 @@ class TPreparedQueryHolder { void FillTable(const NKqpProto::TKqpPhyTable& phyTable); void FillTables(const google::protobuf::RepeatedPtrField< ::NKqpProto::TKqpPhyStage>& stages); + + bool HasTempTables(TKqpTempTablesState::TConstPtr tempTablesState) const; }; From 86a52e9f20e5d63d6f68a72302d17549827c7fb4 Mon Sep 17 00:00:00 2001 From: Nikolay Shumkov Date: Thu, 18 Jan 2024 15:49:01 +0300 Subject: [PATCH 02/10] Fixes --- .../compile_service/kqp_compile_service.cpp | 139 ++++++++++-------- .../kqp/query_data/kqp_prepared_query.cpp | 3 + 2 files changed, 80 insertions(+), 62 deletions(-) diff --git a/ydb/core/kqp/compile_service/kqp_compile_service.cpp b/ydb/core/kqp/compile_service/kqp_compile_service.cpp index 335310a30496..b852ed84769d 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_service.cpp +++ b/ydb/core/kqp/compile_service/kqp_compile_service.cpp @@ -388,6 +388,11 @@ class TKqpRequestsQueue { }; class TKqpCompileService : public TActorBootstrapped { + enum ECacheType { + ByUid, + ByQuery, + ByAst, + }; public: static constexpr NKikimrServices::TActivity::EType ActorActivityType() { return NKikimrServices::TActivity::KQP_COMPILE_SERVICE; @@ -566,26 +571,25 @@ class TKqpCompileService : public TActorBootstrapped { if (request.Uid) { Counters->ReportCompileRequestGet(dbCounters); - if (!request.TempTablesState || request.TempTablesState->TempTables.empty()) { - auto compileResult = QueryCache.FindByUid(*request.Uid, request.KeepInCache); - if (compileResult) { - Y_ENSURE(compileResult->Query); - if (compileResult->Query->UserSid == userSid) { - Counters->ReportQueryCacheHit(dbCounters, true); - - LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Served query from cache by uid" - << ", sender: " << ev->Sender - << ", queryUid: " << *request.Uid); - - ReplyFromCache(ev->Sender, compileResult, ctx, ev->Cookie, std::move(ev->Get()->Orbit), std::move(compileServiceSpan)); - return; - } else { - LOG_NOTICE_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Non-matching user sid for query" - << ", sender: " << ev->Sender - << ", queryUid: " << *request.Uid - << ", expected sid: " << compileResult->Query->UserSid - << ", actual sid: " << userSid); - } + auto compileResult = QueryCache.FindByUid(*request.Uid, request.KeepInCache); + compileResult = WithCache(std::move(compileResult), request.TempTablesState); + if (compileResult) { + Y_ENSURE(compileResult->Query); + if (compileResult->Query->UserSid == userSid) { + Counters->ReportQueryCacheHit(dbCounters, true); + + LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Served query from cache by uid" + << ", sender: " << ev->Sender + << ", queryUid: " << *request.Uid); + + ReplyFromCache(ev->Sender, compileResult, ctx, ev->Cookie, std::move(ev->Get()->Orbit), std::move(compileServiceSpan)); + return; + } else { + LOG_NOTICE_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Non-matching user sid for query" + << ", sender: " << ev->Sender + << ", queryUid: " << *request.Uid + << ", expected sid: " << compileResult->Query->UserSid + << ", actual sid: " << userSid); } } @@ -611,19 +615,18 @@ class TKqpCompileService : public TActorBootstrapped { Y_ENSURE(query.UserSid == userSid); } + auto compileResult = QueryCache.FindByQuery(query, request.KeepInCache); + compileResult = WithCache(std::move(compileResult), request.TempTablesState); - if (!request.TempTablesState || request.TempTablesState->TempTables.empty()) { - auto compileResult = QueryCache.FindByQuery(query, request.KeepInCache); - if (compileResult) { - Counters->ReportQueryCacheHit(dbCounters, true); + if (compileResult) { + Counters->ReportQueryCacheHit(dbCounters, true); - LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Served query from cache from query text" - << ", sender: " << ev->Sender - << ", queryUid: " << compileResult->Uid); + LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Served query from cache from query text" + << ", sender: " << ev->Sender + << ", queryUid: " << compileResult->Uid); - ReplyFromCache(ev->Sender, compileResult, ctx, ev->Cookie, std::move(ev->Get()->Orbit), std::move(compileServiceSpan)); - return; - } + ReplyFromCache(ev->Sender, compileResult, ctx, ev->Cookie, std::move(ev->Get()->Orbit), std::move(compileServiceSpan)); + return; } CollectDiagnostics = request.CollectDiagnostics; @@ -677,10 +680,8 @@ class TKqpCompileService : public TActorBootstrapped { auto dbCounters = request.DbCounters; Counters->ReportRecompileRequestGet(dbCounters); - TKqpCompileResult::TConstPtr compileResult = nullptr; - if (!request.TempTablesState || request.TempTablesState->TempTables.empty()) { - compileResult = QueryCache.FindByUid(request.Uid, false); - } + TKqpCompileResult::TConstPtr compileResult = QueryCache.FindByUid(request.Uid, false); + compileResult = WithCache(std::move(compileResult), request.TempTablesState); if (compileResult || request.Query) { Counters->ReportCompileRequestCompile(dbCounters); @@ -745,27 +746,12 @@ class TKqpCompileService : public TActorBootstrapped { bool keepInCache = compileRequest.KeepInCache && compileResult->AllowCache; - bool hasTempTables = compileRequest.TempTablesState - && (!compileRequest.TempTablesState->TempTables.empty()); - if (compileResult->PreparedQuery) { - hasTempTables = compileResult->PreparedQuery->HasTempTables(compileRequest.TempTablesState); - } + bool hasTempTables = WithCache(compileResult, compileRequest.TempTablesState) != nullptr; try { if (compileResult->Status == Ydb::StatusIds::SUCCESS) { if (!hasTempTables) { - if (QueryCache.FindByUid(compileResult->Uid, false)) { - QueryCache.Replace(compileResult); - } else if (keepInCache) { - if (QueryCache.Insert(compileResult, TableServiceConfig.GetEnableAstCache())) { - Counters->CompileQueryCacheEvicted->Inc(); - } - if (compileResult->Query && compileResult->Query->Settings.IsPrepareQuery) { - if (InsertPreparingQuery(compileResult, compileRequest.KeepInCache)) { - Counters->CompileQueryCacheEvicted->Inc(); - }; - } - } + UpdateQueryCache(compileResult, keepInCache); } if (ev->Get()->ReplayMessage) { @@ -833,25 +819,54 @@ class TKqpCompileService : public TActorBootstrapped { StartCheckQueriesTtlTimer(); } + TKqpCompileResult::TConstPtr WithCache( + TKqpCompileResult::TConstPtr cacheResult, TKqpTempTablesState::TConstPtr tempTablesState) { + if (!cacheResult) { + return nullptr; + } + if (!cacheResult->PreparedQuery) { + return cacheResult; + } + auto hasTempTables = cacheResult->PreparedQuery->HasTempTables(tempTablesState); + if (hasTempTables) { + return nullptr; + } + return cacheResult; + } + + void UpdateQueryCache(TKqpCompileResult::TConstPtr compileResult, bool keepInCache) { + if (QueryCache.FindByUid(compileResult->Uid, false)) { + QueryCache.Replace(compileResult); + } else if (keepInCache) { + if (QueryCache.Insert(compileResult, TableServiceConfig.GetEnableAstCache())) { + Counters->CompileQueryCacheEvicted->Inc(); + } + if (compileResult->Query && compileResult->Query->Settings.IsPrepareQuery) { + if (InsertPreparingQuery(compileResult, true)) { + Counters->CompileQueryCacheEvicted->Inc(); + }; + } + } + } + void Handle(TEvKqp::TEvParseResponse::TPtr& ev, const TActorContext& ctx) { auto& parseResult = ev->Get()->AstResult; auto& query = ev->Get()->Query; auto compileRequest = RequestsQueue.FinishActiveRequest(query); if (parseResult && parseResult->Ast->IsOk()) { - if (!compileRequest.TempTablesState || compileRequest.TempTablesState->TempTables.empty()) { - auto compileResult = QueryCache.FindByAst(query, *parseResult->Ast, compileRequest.KeepInCache); - if (compileResult) { - Counters->ReportQueryCacheHit(compileRequest.DbCounters, true); + auto compileResult = QueryCache.FindByAst(query, *parseResult->Ast, compileRequest.KeepInCache); + compileResult = WithCache(std::move(compileResult), compileRequest.TempTablesState); + if (compileResult) { + Counters->ReportQueryCacheHit(compileRequest.DbCounters, true); - LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Served query from cache from ast" - << ", sender: " << compileRequest.Sender - << ", queryUid: " << compileResult->Uid); + LOG_DEBUG_S(ctx, NKikimrServices::KQP_COMPILE_SERVICE, "Served query from cache from ast" + << ", sender: " << compileRequest.Sender + << ", queryUid: " << compileResult->Uid); - compileResult->Ast->PgAutoParamValues = std::move(parseResult->Ast->PgAutoParamValues); + compileResult->Ast->PgAutoParamValues = std::move(parseResult->Ast->PgAutoParamValues); - ReplyFromCache(compileRequest.Sender, compileResult, ctx, compileRequest.Cookie, std::move(compileRequest.Orbit), std::move(compileRequest.CompileServiceSpan)); - return; - } + ReplyFromCache(compileRequest.Sender, compileResult, ctx, compileRequest.Cookie, std::move(compileRequest.Orbit), std::move(compileRequest.CompileServiceSpan)); + return; } } Counters->ReportQueryCacheHit(compileRequest.DbCounters, false); diff --git a/ydb/core/kqp/query_data/kqp_prepared_query.cpp b/ydb/core/kqp/query_data/kqp_prepared_query.cpp index e9913ed476de..021ee85901f7 100644 --- a/ydb/core/kqp/query_data/kqp_prepared_query.cpp +++ b/ydb/core/kqp/query_data/kqp_prepared_query.cpp @@ -271,6 +271,9 @@ void TPreparedQueryHolder::FillTables(const google::protobuf::RepeatedPtrField< } bool TPreparedQueryHolder::HasTempTables(TKqpTempTablesState::TConstPtr tempTablesState) const { + if (!tempTablesState) { + return false; + } auto tempTables = THashSet(); for (const auto& [path, info] : tempTablesState->TempTables) { tempTables.insert(path.second + *tempTablesState->SessionId); From c5a2ea5745362d212dd8407cd5bc3c1e512794d7 Mon Sep 17 00:00:00 2001 From: Nikolay Shumkov Date: Thu, 18 Jan 2024 15:50:24 +0300 Subject: [PATCH 03/10] Fixes --- ydb/core/kqp/compile_service/kqp_compile_service.cpp | 5 ----- 1 file changed, 5 deletions(-) diff --git a/ydb/core/kqp/compile_service/kqp_compile_service.cpp b/ydb/core/kqp/compile_service/kqp_compile_service.cpp index b852ed84769d..e1e4cfb335b2 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_service.cpp +++ b/ydb/core/kqp/compile_service/kqp_compile_service.cpp @@ -388,11 +388,6 @@ class TKqpRequestsQueue { }; class TKqpCompileService : public TActorBootstrapped { - enum ECacheType { - ByUid, - ByQuery, - ByAst, - }; public: static constexpr NKikimrServices::TActivity::EType ActorActivityType() { return NKikimrServices::TActivity::KQP_COMPILE_SERVICE; From 774012eb1ade4ad88ed0bdfcd626087374d93843 Mon Sep 17 00:00:00 2001 From: Nikolay Shumkov Date: Thu, 18 Jan 2024 15:56:06 +0300 Subject: [PATCH 04/10] Fixes --- ydb/core/kqp/compile_service/kqp_compile_service.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/core/kqp/compile_service/kqp_compile_service.cpp b/ydb/core/kqp/compile_service/kqp_compile_service.cpp index e1e4cfb335b2..e60a89840eaf 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_service.cpp +++ b/ydb/core/kqp/compile_service/kqp_compile_service.cpp @@ -741,7 +741,7 @@ class TKqpCompileService : public TActorBootstrapped { bool keepInCache = compileRequest.KeepInCache && compileResult->AllowCache; - bool hasTempTables = WithCache(compileResult, compileRequest.TempTablesState) != nullptr; + bool hasTempTables = WithCache(compileResult, compileRequest.TempTablesState) == nullptr; try { if (compileResult->Status == Ydb::StatusIds::SUCCESS) { From 237c9127b5ade72a82fc938e3b691eb06528419b Mon Sep 17 00:00:00 2001 From: Nikolay Shumkov Date: Fri, 19 Jan 2024 18:23:03 +0300 Subject: [PATCH 05/10] Fixes --- .../compile_service/kqp_compile_service.cpp | 35 ++- .../executer_actor/kqp_scheme_executer.cpp | 6 +- .../kqp/query_data/kqp_prepared_query.cpp | 11 +- ydb/core/kqp/query_data/kqp_prepared_query.h | 2 +- .../kqp/session_actor/kqp_session_actor.cpp | 63 ++--- ydb/core/kqp/ut/pg/kqp_pg_ut.cpp | 244 ++++++++++++++++++ 6 files changed, 309 insertions(+), 52 deletions(-) diff --git a/ydb/core/kqp/compile_service/kqp_compile_service.cpp b/ydb/core/kqp/compile_service/kqp_compile_service.cpp index e60a89840eaf..63f4fac772e3 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_service.cpp +++ b/ydb/core/kqp/compile_service/kqp_compile_service.cpp @@ -741,7 +741,7 @@ class TKqpCompileService : public TActorBootstrapped { bool keepInCache = compileRequest.KeepInCache && compileResult->AllowCache; - bool hasTempTables = WithCache(compileResult, compileRequest.TempTablesState) == nullptr; + bool hasTempTables = WithCache(compileResult, compileRequest.TempTablesState, true) == nullptr; try { if (compileResult->Status == Ydb::StatusIds::SUCCESS) { @@ -815,18 +815,35 @@ class TKqpCompileService : public TActorBootstrapped { } TKqpCompileResult::TConstPtr WithCache( - TKqpCompileResult::TConstPtr cacheResult, TKqpTempTablesState::TConstPtr tempTablesState) { - if (!cacheResult) { + TKqpCompileResult::TConstPtr compileResult, + TKqpTempTablesState::TConstPtr tempTablesState, bool forInsert = false) { + if (!compileResult) { return nullptr; } - if (!cacheResult->PreparedQuery) { - return cacheResult; + if (!compileResult->PreparedQuery) { + return compileResult; } - auto hasTempTables = cacheResult->PreparedQuery->HasTempTables(tempTablesState); - if (hasTempTables) { - return nullptr; + if (forInsert) { + auto hasTempTables = compileResult->PreparedQuery->HasTempTables(tempTablesState); + if (hasTempTables) { + return nullptr; + } + return compileResult; + } + if (!tempTablesState) { + return compileResult; + } + auto tables = compileResult->PreparedQuery->GetQueryTables(); + auto tempTables = THashSet(); + for (const auto& [path, info] : tempTablesState->TempTables) { + tempTables.insert(path.second); + } + for (const auto& path: tables) { + if (tempTables.contains(path)) { + return nullptr; + } } - return cacheResult; + return compileResult; } void UpdateQueryCache(TKqpCompileResult::TConstPtr compileResult, bool keepInCache) { diff --git a/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp index 450229c2f7b2..aa5028518bed 100644 --- a/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp @@ -113,11 +113,7 @@ class TKqpSchemeExecuter : public TActorBootstrapped { } case NKqpProto::TKqpSchemeOperation::kDropTable: { - auto modifyScheme = schemeOp.GetDropTable(); - if (Temporary) { - auto* dropTable = modifyScheme.MutableDrop(); - dropTable->SetName(dropTable->GetName() + SessionId); - } + const auto& modifyScheme = schemeOp.GetDropTable(); ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme); break; } diff --git a/ydb/core/kqp/query_data/kqp_prepared_query.cpp b/ydb/core/kqp/query_data/kqp_prepared_query.cpp index 021ee85901f7..808568ba8ed8 100644 --- a/ydb/core/kqp/query_data/kqp_prepared_query.cpp +++ b/ydb/core/kqp/query_data/kqp_prepared_query.cpp @@ -107,7 +107,7 @@ bool TKqpPhyTxHolder::IsLiteralTx() const { return LiteralTx; } -std::optional, bool>> +std::optional>> TKqpPhyTxHolder::GetSchemeOpTempTablePath() const { if (GetType() != NKqpProto::TKqpPhyTx::TYPE_SCHEME) { return std::nullopt; @@ -131,8 +131,7 @@ TKqpPhyTxHolder::GetSchemeOpTempTablePath() const { } if (tableDesc->HasTemporary()) { if (tableDesc->GetTemporary()) { - return {{{modifyScheme.GetWorkingDir(), tableDesc->GetName()}, - true}}; + return {{true, {modifyScheme.GetWorkingDir(), tableDesc->GetName()}}}; } } break; @@ -141,8 +140,7 @@ TKqpPhyTxHolder::GetSchemeOpTempTablePath() const { auto modifyScheme = schemeOperation.GetDropTable(); auto* dropTable = modifyScheme.MutableDrop(); - return {{{modifyScheme.GetWorkingDir(), dropTable->GetName()}, - false}}; + return {{false, {modifyScheme.GetWorkingDir(), dropTable->GetName()}}}; } default: return std::nullopt; @@ -274,6 +272,7 @@ bool TPreparedQueryHolder::HasTempTables(TKqpTempTablesState::TConstPtr tempTabl if (!tempTablesState) { return false; } + YQL_ENSURE(tempTablesState->SessionId); auto tempTables = THashSet(); for (const auto& [path, info] : tempTablesState->TempTables) { tempTables.insert(path.second + *tempTablesState->SessionId); @@ -289,7 +288,7 @@ bool TPreparedQueryHolder::HasTempTables(TKqpTempTablesState::TConstPtr tempTabl if (!optPath) { continue; } else { - const auto& [path, isCreate] = *optPath; + const auto& [isCreate, path] = *optPath; if (isCreate) { return true; } else { diff --git a/ydb/core/kqp/query_data/kqp_prepared_query.h b/ydb/core/kqp/query_data/kqp_prepared_query.h index c0f194ba0315..680bbe7a823d 100644 --- a/ydb/core/kqp/query_data/kqp_prepared_query.h +++ b/ydb/core/kqp/query_data/kqp_prepared_query.h @@ -117,7 +117,7 @@ class TKqpPhyTxHolder { bool IsLiteralTx() const; - std::optional, bool>> + std::optional>> GetSchemeOpTempTablePath() const; }; diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 133634d3a49b..bfd92fc083b9 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -1160,39 +1160,34 @@ class TKqpSessionActor : public TActorBootstrapped { } } - std::optional GetTemporaryTableInfo(TKqpPhyTxHolder::TConstPtr tx) { + std::optional> + GetTemporaryTableInfo(TKqpPhyTxHolder::TConstPtr tx) { if (!tx) { return std::nullopt; } - const auto& schemeOperation = tx->GetSchemeOperation(); - switch (schemeOperation.GetOperationCase()) { - case NKqpProto::TKqpSchemeOperation::kCreateTable: { - const auto& modifyScheme = schemeOperation.GetCreateTable(); - const NKikimrSchemeOp::TTableDescription* tableDesc = nullptr; - switch (modifyScheme.GetOperationType()) { - case NKikimrSchemeOp::ESchemeOpCreateTable: { - tableDesc = &modifyScheme.GetCreateTable(); - break; - } - case NKikimrSchemeOp::ESchemeOpCreateIndexedTable: { - tableDesc = &modifyScheme.GetCreateIndexedTable().GetTableDescription(); - break; - } - default: - YQL_ENSURE(false, "Unexpected operation type"); - } - auto userToken = QueryState ? QueryState->UserToken : TIntrusiveConstPtr(); - if (tableDesc->HasTemporary()) { - if (tableDesc->GetTemporary()) { - return {{tableDesc->GetName(), modifyScheme.GetWorkingDir(), Settings.Cluster, userToken, Settings.Database}}; - } - } - break; - } - default: - return std::nullopt; + auto optPath = tx->GetSchemeOpTempTablePath(); + if (!optPath) { + return std::nullopt; } - return std::nullopt; + const auto& [isCreate, path] = *optPath; + if (isCreate) { + auto userToken = QueryState ? QueryState->UserToken : TIntrusiveConstPtr(); + return {{true, {path.second, path.first, Settings.Cluster, userToken, Settings.Database}}}; + } + + TString name = path.second; + auto pos = name.find(*TempTablesState.SessionId); + + if (pos == TString::npos) { + return std::nullopt; + } + name.erase(pos, name.size()); + + auto it = TempTablesState.TempTables.find(std::make_pair(Settings.Cluster, JoinPath({path.first, name}))); + if (it == TempTablesState.TempTables.end()) { + return std::nullopt; + } + return {{false, it->second}}; } void UpdateTempTablesState() { @@ -1203,8 +1198,14 @@ class TKqpSessionActor : public TActorBootstrapped { if (!tx) { return; } - if (auto tempTableInfo = GetTemporaryTableInfo(tx)) { - TempTablesState.TempTables[std::make_pair(tempTableInfo->Database, JoinPath({tempTableInfo->WorkingDir, tempTableInfo->Name}))] = std::move(*tempTableInfo); + auto optInfo = GetTemporaryTableInfo(tx); + if (optInfo) { + auto [isCreate, tempTableInfo] = *optInfo; + if (isCreate) { + TempTablesState.TempTables[std::make_pair(tempTableInfo.Database, JoinPath({tempTableInfo.WorkingDir, tempTableInfo.Name}))] = tempTableInfo; + } else { + TempTablesState.TempTables.erase(std::make_pair(tempTableInfo.Database, JoinPath({tempTableInfo.WorkingDir, tempTableInfo.Name}))); + } QueryState->UpdateTempTablesState(TempTablesState); } } diff --git a/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp b/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp index 5f32e98be209..32c8a3d88ccd 100644 --- a/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp +++ b/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp @@ -2251,6 +2251,250 @@ Y_UNIT_TEST_SUITE(KqpPg) { } } + Y_UNIT_TEST(TempTablesWithCache) { + NKikimrConfig::TAppConfig appConfig; + appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true); + auto setting = NKikimrKqp::TKqpSetting(); + auto serverSettings = TKikimrSettings() + .SetAppConfig(appConfig) + .SetKqpSettings({setting}); + TKikimrRunner kikimr( + serverSettings.SetWithSampleTables(false).SetEnableTempTables(true)); + auto clientConfig = NGRpcProxy::TGRpcClientConfig(kikimr.GetEndpoint()); + auto client = kikimr.GetQueryClient(); + + auto settings = NYdb::NQuery::TExecuteQuerySettings() + .Syntax(NYdb::NQuery::ESyntax::Pg) + .StatsMode(NYdb::NQuery::EStatsMode::Basic); + { + auto session = client.GetSession().GetValueSync().GetSession(); + auto id = session.GetId(); + { + const auto query = Q_(R"( + --!syntax_pg + CREATE TABLE PgTemp ( + key int2 PRIMARY KEY, + value int2))"); + + auto result = + session.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx(), settings).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + UNIT_ASSERT_VALUES_EQUAL(stats.compilation().from_cache(), false); + } + + { + const auto query = Q_(R"( + --!syntax_pg + DROP TABLE PgTemp; + )"); + + auto result = + session.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx(), settings).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + UNIT_ASSERT_VALUES_EQUAL(stats.compilation().from_cache(), false); + } + + { + const auto query = Q_(R"( + --!syntax_pg + CREATE TABLE PgTemp ( + key int2 PRIMARY KEY, + value int2))"); + + auto result = + session.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx(), settings).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + UNIT_ASSERT_VALUES_EQUAL(stats.compilation().from_cache(), false); + + auto resultInsert = session.ExecuteQuery(R"( + INSERT INTO PgTemp VALUES(1, 1); + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C( + resultInsert.GetStatus(), EStatus::SUCCESS, resultInsert.GetIssues().ToString()); + } + + { + const auto query = Q_(R"( + --!syntax_pg + CREATE TABLE SimpleTable ( + key int2 PRIMARY KEY, + value int2))"); + + auto result = + session.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx(), settings).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + UNIT_ASSERT_VALUES_EQUAL(stats.compilation().from_cache(), false); + } + + { + const auto query = Q_(R"( + --!syntax_pg + SELECT * FROM PgTemp; + )"); + + auto result = session.ExecuteQuery( + query, NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + UNIT_ASSERT_VALUES_EQUAL(stats.compilation().from_cache(), false); + } + + { + const auto query = Q_(R"( + --!syntax_pg + SELECT * FROM PgTemp; + )"); + + auto result = session.ExecuteQuery( + query, NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + UNIT_ASSERT_VALUES_EQUAL(stats.compilation().from_cache(), true); + } + + { + const auto query = Q_(R"( + --!syntax_pg + SELECT * FROM SimpleTable; + )"); + + auto result = session.ExecuteQuery( + query, NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + UNIT_ASSERT_VALUES_EQUAL(stats.compilation().from_cache(), false); + } + + { + const auto query = Q_(R"( + --!syntax_pg + SELECT * FROM SimpleTable; + )"); + + auto result = session.ExecuteQuery( + query, NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + UNIT_ASSERT_VALUES_EQUAL(stats.compilation().from_cache(), true); + } + + { + const auto query = Q_(R"( + --!syntax_pg + CREATE TEMP TABLE PgTemp ( + key int2 PRIMARY KEY, + value int2))"); + + auto result = + session.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx(), settings).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + UNIT_ASSERT_VALUES_EQUAL(stats.compilation().from_cache(), false); + + auto resultInsert = session.ExecuteQuery(R"( + INSERT INTO PgTemp VALUES(2, 2); + )", NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C( + resultInsert.GetStatus(), EStatus::SUCCESS, resultInsert.GetIssues().ToString()); + } + + { + const auto query = Q_(R"( + --!syntax_pg + SELECT * FROM PgTemp; + )"); + + auto result = session.ExecuteQuery( + query, NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + UNIT_ASSERT_VALUES_EQUAL(stats.compilation().from_cache(), false); + + UNIT_ASSERT_C(!result.GetResultSets().empty(), "results are empty"); + CompareYson(R"( + [["2";"2"]] + )", FormatResultSetYson(result.GetResultSet(0))); + } + + { + const auto query = Q_(R"( + --!syntax_pg + SELECT * FROM SimpleTable; + )"); + + auto result = session.ExecuteQuery( + query, NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + UNIT_ASSERT_VALUES_EQUAL(stats.compilation().from_cache(), true); + } + + { + const auto query = Q_(R"( + --!syntax_pg + DROP TABLE PgTemp; + )"); + + auto result = + session.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx(), settings).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + UNIT_ASSERT_VALUES_EQUAL(stats.compilation().from_cache(), false); + } + + { + const auto query = Q_(R"( + --!syntax_pg + SELECT * FROM PgTemp; + )"); + + auto result = session.ExecuteQuery( + query, NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + UNIT_ASSERT_VALUES_EQUAL(stats.compilation().from_cache(), true); + + UNIT_ASSERT_C(!result.GetResultSets().empty(), "results are empty"); + CompareYson(R"( + [["1";"1"]] + )", FormatResultSetYson(result.GetResultSet(0))); + } + + { + const auto query = Q_(R"( + --!syntax_pg + DROP TABLE PgTemp; + )"); + + auto result = + session.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx(), settings).ExtractValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + UNIT_ASSERT_VALUES_EQUAL(stats.compilation().from_cache(), true); + } + + bool allDoneOk = true; + NTestHelpers::CheckDelete(clientConfig, id, Ydb::StatusIds::SUCCESS, allDoneOk); + + UNIT_ASSERT(allDoneOk); + } + + { + const auto querySelect = Q_(R"( + --!syntax_pg + SELECT * FROM PgTemp; + )"); + + auto resultSelect = client.ExecuteQuery( + querySelect, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + UNIT_ASSERT(!resultSelect.IsSuccess()); + } + } + Y_UNIT_TEST(ValuesInsert) { TKikimrRunner kikimr(NKqp::TKikimrSettings().SetWithSampleTables(false)); auto testSingleType = [&kikimr] (const TPgTypeTestSpec& spec) { From 44b942bda379a09992a536cb25bbb935f11470a8 Mon Sep 17 00:00:00 2001 From: Nikolay Shumkov Date: Fri, 19 Jan 2024 18:43:15 +0300 Subject: [PATCH 06/10] Fixes --- ydb/core/kqp/ut/pg/kqp_pg_ut.cpp | 28 +--------------------------- 1 file changed, 1 insertion(+), 27 deletions(-) diff --git a/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp b/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp index 32c8a3d88ccd..253afa09a87e 100644 --- a/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp +++ b/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp @@ -2343,19 +2343,6 @@ Y_UNIT_TEST_SUITE(KqpPg) { UNIT_ASSERT_VALUES_EQUAL(stats.compilation().from_cache(), false); } - { - const auto query = Q_(R"( - --!syntax_pg - SELECT * FROM PgTemp; - )"); - - auto result = session.ExecuteQuery( - query, NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); - UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); - auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); - UNIT_ASSERT_VALUES_EQUAL(stats.compilation().from_cache(), true); - } - { const auto query = Q_(R"( --!syntax_pg @@ -2369,19 +2356,6 @@ Y_UNIT_TEST_SUITE(KqpPg) { UNIT_ASSERT_VALUES_EQUAL(stats.compilation().from_cache(), false); } - { - const auto query = Q_(R"( - --!syntax_pg - SELECT * FROM SimpleTable; - )"); - - auto result = session.ExecuteQuery( - query, NYdb::NQuery::TTxControl::BeginTx().CommitTx(), settings).ExtractValueSync(); - UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); - auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); - UNIT_ASSERT_VALUES_EQUAL(stats.compilation().from_cache(), true); - } - { const auto query = Q_(R"( --!syntax_pg @@ -2474,7 +2448,7 @@ Y_UNIT_TEST_SUITE(KqpPg) { session.ExecuteQuery(query, NYdb::NQuery::TTxControl::NoTx(), settings).ExtractValueSync(); UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); - UNIT_ASSERT_VALUES_EQUAL(stats.compilation().from_cache(), true); + UNIT_ASSERT_VALUES_EQUAL(stats.compilation().from_cache(), false); } bool allDoneOk = true; From 23faac5b992b0f9054ac7348da9f2b6b4054c3e1 Mon Sep 17 00:00:00 2001 From: Nikolay Shumkov Date: Wed, 24 Jan 2024 04:24:02 +0300 Subject: [PATCH 07/10] Fixes --- ydb/core/kqp/common/simple/temp_tables.cpp | 17 ++++++ ydb/core/kqp/common/simple/temp_tables.h | 10 ++-- .../compile_service/kqp_compile_service.cpp | 53 ++++++++----------- ydb/core/kqp/gateway/kqp_metadata_loader.cpp | 18 +++---- ydb/core/kqp/provider/yql_kikimr_provider.cpp | 25 +++++---- ydb/core/kqp/provider/yql_kikimr_provider.h | 37 ++++++------- .../kqp/query_data/kqp_prepared_query.cpp | 32 +++++------ ydb/core/kqp/query_data/kqp_prepared_query.h | 2 +- .../kqp/session_actor/kqp_session_actor.cpp | 35 ++++++------ .../kqp/session_actor/kqp_session_actor.h | 3 +- .../session_actor/kqp_temp_tables_manager.cpp | 14 ++--- 11 files changed, 125 insertions(+), 121 deletions(-) diff --git a/ydb/core/kqp/common/simple/temp_tables.cpp b/ydb/core/kqp/common/simple/temp_tables.cpp index 3f7645d2de8a..4beb776d1dc5 100644 --- a/ydb/core/kqp/common/simple/temp_tables.cpp +++ b/ydb/core/kqp/common/simple/temp_tables.cpp @@ -2,4 +2,21 @@ namespace NKikimr::NKqp { +THashMap::const_iterator +TKqpTempTablesState::FindInfo(const std::string_view& path, bool withSessionId) const { + if (!withSessionId) { + return TempTables.find(path); + } + + if (path.size() < SessionId.size()) { + return TempTables.end(); + } + size_t pos = path.size() - SessionId.size(); + if (path.substr(pos) != SessionId) { + return TempTables.end(); + } + + return TempTables.find(path.substr(0, pos)); +} + } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/common/simple/temp_tables.h b/ydb/core/kqp/common/simple/temp_tables.h index 2f3bb049e2aa..320e9ce82940 100644 --- a/ydb/core/kqp/common/simple/temp_tables.h +++ b/ydb/core/kqp/common/simple/temp_tables.h @@ -3,6 +3,7 @@ #include #include +#include #include #include @@ -14,14 +15,15 @@ struct TKqpTempTablesState { struct TTempTableInfo { TString Name; TString WorkingDir; - TString Database; TIntrusiveConstPtr UserToken; - TString Cluster; }; - std::optional SessionId; - THashMap, TTempTableInfo> TempTables; + TString SessionId; + THashMap TempTables; using TConstPtr = std::shared_ptr; + + THashMap::const_iterator + FindInfo(const std::string_view& path, bool withSessionId = false) const; }; } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/compile_service/kqp_compile_service.cpp b/ydb/core/kqp/compile_service/kqp_compile_service.cpp index 63f4fac772e3..34b3074d9bca 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_service.cpp +++ b/ydb/core/kqp/compile_service/kqp_compile_service.cpp @@ -567,7 +567,9 @@ class TKqpCompileService : public TActorBootstrapped { Counters->ReportCompileRequestGet(dbCounters); auto compileResult = QueryCache.FindByUid(*request.Uid, request.KeepInCache); - compileResult = WithCache(std::move(compileResult), request.TempTablesState); + if (HasTempTablesNameClashes(compileResult, request.TempTablesState)) { + compileResult = nullptr; + } if (compileResult) { Y_ENSURE(compileResult->Query); if (compileResult->Query->UserSid == userSid) { @@ -611,7 +613,9 @@ class TKqpCompileService : public TActorBootstrapped { } auto compileResult = QueryCache.FindByQuery(query, request.KeepInCache); - compileResult = WithCache(std::move(compileResult), request.TempTablesState); + if (HasTempTablesNameClashes(compileResult, request.TempTablesState)) { + compileResult = nullptr; + } if (compileResult) { Counters->ReportQueryCacheHit(dbCounters, true); @@ -676,7 +680,9 @@ class TKqpCompileService : public TActorBootstrapped { Counters->ReportRecompileRequestGet(dbCounters); TKqpCompileResult::TConstPtr compileResult = QueryCache.FindByUid(request.Uid, false); - compileResult = WithCache(std::move(compileResult), request.TempTablesState); + if (HasTempTablesNameClashes(compileResult, request.TempTablesState)) { + compileResult = nullptr; + } if (compileResult || request.Query) { Counters->ReportCompileRequestCompile(dbCounters); @@ -741,11 +747,11 @@ class TKqpCompileService : public TActorBootstrapped { bool keepInCache = compileRequest.KeepInCache && compileResult->AllowCache; - bool hasTempTables = WithCache(compileResult, compileRequest.TempTablesState, true) == nullptr; + bool hasTempTablesNameClashes = HasTempTablesNameClashes(compileResult, compileRequest.TempTablesState, true); try { if (compileResult->Status == Ydb::StatusIds::SUCCESS) { - if (!hasTempTables) { + if (!hasTempTablesNameClashes) { UpdateQueryCache(compileResult, keepInCache); } @@ -760,7 +766,7 @@ class TKqpCompileService : public TActorBootstrapped { request.Cookie, std::move(request.Orbit), std::move(request.CompileServiceSpan), (CollectDiagnostics ? ev->Get()->ReplayMessageUserView : std::nullopt)); } } else { - if (!hasTempTables) { + if (!hasTempTablesNameClashes) { if (QueryCache.FindByUid(compileResult->Uid, false)) { QueryCache.EraseByUid(compileResult->Uid); } @@ -814,36 +820,17 @@ class TKqpCompileService : public TActorBootstrapped { StartCheckQueriesTtlTimer(); } - TKqpCompileResult::TConstPtr WithCache( + bool HasTempTablesNameClashes( TKqpCompileResult::TConstPtr compileResult, - TKqpTempTablesState::TConstPtr tempTablesState, bool forInsert = false) { + TKqpTempTablesState::TConstPtr tempTablesState, bool withSessionId = false) { if (!compileResult) { - return nullptr; + return false; } if (!compileResult->PreparedQuery) { - return compileResult; - } - if (forInsert) { - auto hasTempTables = compileResult->PreparedQuery->HasTempTables(tempTablesState); - if (hasTempTables) { - return nullptr; - } - return compileResult; - } - if (!tempTablesState) { - return compileResult; - } - auto tables = compileResult->PreparedQuery->GetQueryTables(); - auto tempTables = THashSet(); - for (const auto& [path, info] : tempTablesState->TempTables) { - tempTables.insert(path.second); - } - for (const auto& path: tables) { - if (tempTables.contains(path)) { - return nullptr; - } + return false; } - return compileResult; + + return compileResult->PreparedQuery->HasTempTables(tempTablesState, withSessionId); } void UpdateQueryCache(TKqpCompileResult::TConstPtr compileResult, bool keepInCache) { @@ -867,7 +854,9 @@ class TKqpCompileService : public TActorBootstrapped { auto compileRequest = RequestsQueue.FinishActiveRequest(query); if (parseResult && parseResult->Ast->IsOk()) { auto compileResult = QueryCache.FindByAst(query, *parseResult->Ast, compileRequest.KeepInCache); - compileResult = WithCache(std::move(compileResult), compileRequest.TempTablesState); + if (HasTempTablesNameClashes(compileResult, compileRequest.TempTablesState)) { + compileResult = nullptr; + } if (compileResult) { Counters->ReportQueryCacheHit(compileRequest.DbCounters, true); diff --git a/ydb/core/kqp/gateway/kqp_metadata_loader.cpp b/ydb/core/kqp/gateway/kqp_metadata_loader.cpp index c8a7f7038cde..e368f3f40ad9 100644 --- a/ydb/core/kqp/gateway/kqp_metadata_loader.cpp +++ b/ydb/core/kqp/gateway/kqp_metadata_loader.cpp @@ -30,16 +30,16 @@ struct NavigateEntryResult { std::optional QueryName; }; -NavigateEntryResult CreateNavigateEntry(const TString& cluster, const TString& path, - const NYql::IKikimrGateway::TLoadTableMetadataSettings& settings, TKqpTempTablesState::TConstPtr tempTablesState = nullptr) { +NavigateEntryResult CreateNavigateEntry(const TString& path, + const NYql::IKikimrGateway::TLoadTableMetadataSettings& settings, TKqpTempTablesState::TConstPtr tempTablesState = nullptr) { TNavigate::TEntry entry; TString currentPath = path; std::optional queryName = std::nullopt; if (tempTablesState) { - auto tempTablesIt = tempTablesState->TempTables.find(std::make_pair(cluster, currentPath)); - if (tempTablesState->SessionId && tempTablesIt != tempTablesState->TempTables.end()) { + auto tempTablesInfoIt = tempTablesState->FindInfo(currentPath, false); + if (tempTablesInfoIt != tempTablesState->TempTables.end()) { queryName = currentPath; - currentPath = currentPath + *tempTablesState->SessionId; + currentPath = currentPath + tempTablesState->SessionId; } } entry.Path = SplitPath(currentPath); @@ -50,10 +50,8 @@ NavigateEntryResult CreateNavigateEntry(const TString& cluster, const TString& p return {entry, currentPath, queryName}; } -NavigateEntryResult CreateNavigateEntry(const TString& cluster, - const std::pair& pair, +NavigateEntryResult CreateNavigateEntry(const std::pair& pair, const NYql::IKikimrGateway::TLoadTableMetadataSettings& settings, TKqpTempTablesState::TConstPtr tempTablesState = nullptr) { - Y_UNUSED(cluster); Y_UNUSED(tempTablesState); TNavigate::TEntry entry; @@ -701,8 +699,8 @@ NThreading::TFuture TKqpTableMetadataLoader::LoadTableMeta const auto externalEntryItem = CreateNavigateExternalEntry(id, settings.WithExternalDatasources_); Y_ABORT_UNLESS(!settings.WithExternalDatasources_ || externalEntryItem, "External data source must be resolved using path only"); - auto resNavigate = settings.WithExternalDatasources_ ? *externalEntryItem : CreateNavigateEntry(cluster, - id, settings, TempTablesState); + auto resNavigate = settings.WithExternalDatasources_ ? *externalEntryItem : CreateNavigateEntry(id, + settings, TempTablesState); const auto entry = resNavigate.Entry; const auto queryName = resNavigate.QueryName; const auto externalEntry = settings.WithExternalDatasources_ ? std::optional{} : externalEntryItem; diff --git a/ydb/core/kqp/provider/yql_kikimr_provider.cpp b/ydb/core/kqp/provider/yql_kikimr_provider.cpp index b432ce8484df..7ed4e07c7235 100644 --- a/ydb/core/kqp/provider/yql_kikimr_provider.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_provider.cpp @@ -121,11 +121,11 @@ struct TKikimrData { const TKikimrTableDescription* TKikimrTablesData::EnsureTableExists(const TString& cluster, const TString& table, TPositionHandle pos, TExprContext& ctx) const { - auto tempTable = TempTables.FindPtr(table); + auto tempTableInfoIt = TempTablesState->FindInfo(table, true); auto tablePath = table; - if (tempTable) { - tablePath = *tempTable; + if (tempTableInfoIt != TempTablesState->TempTables.end()) { + tablePath = tempTableInfoIt->first; } auto desc = Tables.FindPtr(std::make_pair(cluster, tablePath)); @@ -141,11 +141,11 @@ const TKikimrTableDescription* TKikimrTablesData::EnsureTableExists(const TStrin } TKikimrTableDescription& TKikimrTablesData::GetOrAddTable(const TString& cluster, const TString& database, const TString& table, ETableType tableType) { - auto tempTable = TempTables.FindPtr(table); + auto tempTableInfoIt = TempTablesState->FindInfo(table, true); auto tablePath = table; - if (tempTable) { - tablePath = *tempTable; + if (tempTableInfoIt != TempTablesState->TempTables.end()) { + tablePath = tempTableInfoIt->first; } if (!Tables.FindPtr(std::make_pair(cluster, tablePath))) { @@ -165,11 +165,11 @@ TKikimrTableDescription& TKikimrTablesData::GetOrAddTable(const TString& cluster } TKikimrTableDescription& TKikimrTablesData::GetTable(const TString& cluster, const TString& table) { - auto tempTable = TempTables.FindPtr(table); + auto tempTableInfoIt = TempTablesState->FindInfo(table, true); auto tablePath = table; - if (tempTable) { - tablePath = *tempTable; + if (tempTableInfoIt != TempTablesState->TempTables.end()) { + tablePath = tempTableInfoIt->first; } auto desc = Tables.FindPtr(std::make_pair(cluster, tablePath)); @@ -181,12 +181,11 @@ TKikimrTableDescription& TKikimrTablesData::GetTable(const TString& cluster, con const TKikimrTableDescription& TKikimrTablesData::ExistingTable(const TStringBuf& cluster, const TStringBuf& table) const { - auto tempTable = TempTables.FindPtr(table); + auto tempTableInfoIt = TempTablesState->FindInfo(table, true); auto tablePath = table; - - if (tempTable) { - tablePath = *tempTable; + if (tempTableInfoIt != TempTablesState->TempTables.end()) { + tablePath = tempTableInfoIt->first; } auto desc = Tables.FindPtr(std::make_pair(TString(cluster), TString(tablePath))); diff --git a/ydb/core/kqp/provider/yql_kikimr_provider.h b/ydb/core/kqp/provider/yql_kikimr_provider.h index 2798b50e5efb..abc98f548e8a 100644 --- a/ydb/core/kqp/provider/yql_kikimr_provider.h +++ b/ydb/core/kqp/provider/yql_kikimr_provider.h @@ -200,16 +200,12 @@ class TKikimrTablesData : public TThrRefBase { } void SetTempTables(NKikimr::NKqp::TKqpTempTablesState::TConstPtr tempTablesState) { - if (tempTablesState) { - for (const auto& [path, info] : tempTablesState->TempTables) { - TempTables[path.second + *tempTablesState->SessionId] = path.second; - } - } + TempTablesState = std::move(tempTablesState); } private: THashMap, TKikimrTableDescription> Tables; - THashMap TempTables; + NKikimr::NKqp::TKqpTempTablesState::TConstPtr TempTablesState; }; enum class TYdbOperation : ui32 { @@ -288,11 +284,7 @@ class TKikimrTransactionContextBase : public TThrRefBase { } void SetTempTables(NKikimr::NKqp::TKqpTempTablesState::TConstPtr tempTablesState) { - if (tempTablesState) { - for (const auto& [path, info] : tempTablesState->TempTables) { - TempTables[path.second] = path.second + *tempTablesState->SessionId; - } - } + TempTablesState = std::move(tempTablesState); } template @@ -331,16 +323,25 @@ class TKikimrTransactionContextBase : public TThrRefBase { for (const auto& op : operations) { const auto& table = [&]() -> const TString& { - const auto tempTable = TempTables.FindPtr(op.GetTable()); - if (tempTable) { - return *tempTable; - } else { - return op.GetTable(); + auto tempTableInfoIt = TempTablesState->FindInfo(op.GetTable(), false); + + if (tempTableInfoIt != TempTablesState->TempTables.end()) { + return tempTableInfoIt->first + TempTablesState->SessionId; } + return op.GetTable(); }(); const auto newOp = TYdbOperation(op.GetOperation()); + auto newOp = TYdbOperation(op.GetOperation()); + TPosition pos(op.GetPosition().GetColumn(), op.GetPosition().GetRow()); + + auto tempTableInfoIt = TempTablesState->FindInfo(table, false); + + if (tempTableInfoIt != TempTablesState->TempTables.end()) { + table = tempTableInfoIt->first + TempTablesState->SessionId; + } + const auto info = tableInfoMap.FindPtr(table); if (!info) { TString message = TStringBuilder() @@ -450,7 +451,7 @@ class TKikimrTransactionContextBase : public TThrRefBase { THashMap TableOperations; THashMap TableByIdMap; TMaybe EffectiveIsolationLevel; - THashMap TempTables; + NKikimr::NKqp::TKqpTempTablesState::TConstPtr TempTablesState; bool Readonly = false; bool Invalidated = false; bool Closed = false; @@ -535,7 +536,7 @@ class TKikimrSessionContext : public TThrRefBase { if (TxCtx) { TxCtx->SetTempTables(tempTablesState); } - TempTablesState = tempTablesState; + TempTablesState = std::move(tempTablesState); } const TIntrusiveConstPtr& GetUserToken() const { diff --git a/ydb/core/kqp/query_data/kqp_prepared_query.cpp b/ydb/core/kqp/query_data/kqp_prepared_query.cpp index 808568ba8ed8..5bc367698160 100644 --- a/ydb/core/kqp/query_data/kqp_prepared_query.cpp +++ b/ydb/core/kqp/query_data/kqp_prepared_query.cpp @@ -268,32 +268,32 @@ void TPreparedQueryHolder::FillTables(const google::protobuf::RepeatedPtrField< } } -bool TPreparedQueryHolder::HasTempTables(TKqpTempTablesState::TConstPtr tempTablesState) const { +bool TPreparedQueryHolder::HasTempTables(TKqpTempTablesState::TConstPtr tempTablesState, bool withSessionId) const { if (!tempTablesState) { return false; } - YQL_ENSURE(tempTablesState->SessionId); - auto tempTables = THashSet(); - for (const auto& [path, info] : tempTablesState->TempTables) { - tempTables.insert(path.second + *tempTablesState->SessionId); - } for (const auto& table: QueryTables) { - if (tempTables.contains(table)) { + auto infoIt = tempTablesState->FindInfo(table, withSessionId); + if (infoIt != tempTablesState->TempTables.end()) { return true; } } - for (const auto& tx: Transactions) { - auto optPath = tx->GetSchemeOpTempTablePath(); - if (!optPath) { - continue; - } else { - const auto& [isCreate, path] = *optPath; - if (isCreate) { - return true; + + if (withSessionId) { + for (const auto& tx: Transactions) { + auto optPath = tx->GetSchemeOpTempTablePath(); + if (!optPath) { + continue; } else { - if (tempTables.contains(JoinPath({path.first, path.second}))) { + const auto& [isCreate, path] = *optPath; + if (isCreate) { return true; + } else { + auto infoIt = tempTablesState->FindInfo(JoinPath({path.first, path.second}), withSessionId); + if (infoIt != tempTablesState->TempTables.end()) { + return true; + } } } } diff --git a/ydb/core/kqp/query_data/kqp_prepared_query.h b/ydb/core/kqp/query_data/kqp_prepared_query.h index 680bbe7a823d..e43a49c83af5 100644 --- a/ydb/core/kqp/query_data/kqp_prepared_query.h +++ b/ydb/core/kqp/query_data/kqp_prepared_query.h @@ -192,7 +192,7 @@ class TPreparedQueryHolder { void FillTables(const google::protobuf::RepeatedPtrField< ::NKqpProto::TKqpPhyStage>& stages); - bool HasTempTables(TKqpTempTablesState::TConstPtr tempTablesState) const; + bool HasTempTables(TKqpTempTablesState::TConstPtr tempTablesState, bool withSessionId) const; }; diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index bfd92fc083b9..0a1c9eb0145e 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -180,8 +180,11 @@ class TKqpSessionActor : public TActorBootstrapped { FillSettings.Format = IDataProvider::EResultFormat::Custom; FillSettings.FormatDetails = TString(KikimrMkqlProtoFormat); - TempTablesState.SessionId = TryDecodeYdbSessionId(SessionId); - LOG_D("Create session actor with id " << *TempTablesState.SessionId); + auto optSessionId = TryDecodeYdbSessionId(SessionId); + YQL_ENSURE(optSessionId, "Can't decode ydb session Id"); + + TempTablesState.SessionId = *optSessionId; + LOG_D("Create session actor with id " << TempTablesState.SessionId); } void Bootstrap() { @@ -1063,7 +1066,7 @@ class TKqpSessionActor : public TActorBootstrapped { bool temporary = GetTemporaryTableInfo(tx).has_value(); auto executerActor = CreateKqpSchemeExecuter(tx, QueryState->GetType(), SelfId(), requestType, Settings.Database, userToken, - temporary, *TempTablesState.SessionId, QueryState->UserRequestContext); + temporary, TempTablesState.SessionId, QueryState->UserRequestContext); ExecuterId = RegisterWithSameMailbox(executerActor); } @@ -1160,7 +1163,7 @@ class TKqpSessionActor : public TActorBootstrapped { } } - std::optional> + std::optional>> GetTemporaryTableInfo(TKqpPhyTxHolder::TConstPtr tx) { if (!tx) { return std::nullopt; @@ -1172,22 +1175,15 @@ class TKqpSessionActor : public TActorBootstrapped { const auto& [isCreate, path] = *optPath; if (isCreate) { auto userToken = QueryState ? QueryState->UserToken : TIntrusiveConstPtr(); - return {{true, {path.second, path.first, Settings.Cluster, userToken, Settings.Database}}}; - } - - TString name = path.second; - auto pos = name.find(*TempTablesState.SessionId); - - if (pos == TString::npos) { - return std::nullopt; + return {{true, {JoinPath({path.first, path.second}), {path.second, path.first, userToken}}}}; } - name.erase(pos, name.size()); - auto it = TempTablesState.TempTables.find(std::make_pair(Settings.Cluster, JoinPath({path.first, name}))); + auto it = TempTablesState.FindInfo(JoinPath({path.first, path.second})); if (it == TempTablesState.TempTables.end()) { return std::nullopt; } - return {{false, it->second}}; + + return {{false, {it->first, {}}}}; } void UpdateTempTablesState() { @@ -1200,11 +1196,11 @@ class TKqpSessionActor : public TActorBootstrapped { } auto optInfo = GetTemporaryTableInfo(tx); if (optInfo) { - auto [isCreate, tempTableInfo] = *optInfo; + auto [isCreate, info] = *optInfo; if (isCreate) { - TempTablesState.TempTables[std::make_pair(tempTableInfo.Database, JoinPath({tempTableInfo.WorkingDir, tempTableInfo.Name}))] = tempTableInfo; + TempTablesState.TempTables[info.first] = info.second; } else { - TempTablesState.TempTables.erase(std::make_pair(tempTableInfo.Database, JoinPath({tempTableInfo.WorkingDir, tempTableInfo.Name}))); + TempTablesState.TempTables.erase(info.first); } QueryState->UpdateTempTablesState(TempTablesState); } @@ -1883,7 +1879,8 @@ class TKqpSessionActor : public TActorBootstrapped { Become(&TKqpSessionActor::FinalCleanupState); LOG_D("Cleanup temp tables: " << TempTablesState.TempTables.size()); - auto tempTablesManager = CreateKqpTempTablesManager(std::move(TempTablesState), SelfId()); + auto tempTablesManager = CreateKqpTempTablesManager( + std::move(TempTablesState), SelfId(), Settings.Database); RegisterWithSameMailbox(tempTablesManager); return; } else { diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.h b/ydb/core/kqp/session_actor/kqp_session_actor.h index 77920f816486..68214d534856 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.h +++ b/ydb/core/kqp/session_actor/kqp_session_actor.h @@ -41,6 +41,7 @@ IActor* CreateKqpSessionActor(const TActorId& owner, const TString& sessionId, const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig ); -IActor* CreateKqpTempTablesManager(TKqpTempTablesState tempTablesState, const TActorId& target); +IActor* CreateKqpTempTablesManager( + TKqpTempTablesState tempTablesState, const TActorId& target, const TString& database); } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/session_actor/kqp_temp_tables_manager.cpp b/ydb/core/kqp/session_actor/kqp_temp_tables_manager.cpp index 17e6a78b142b..d7d13ad49828 100644 --- a/ydb/core/kqp/session_actor/kqp_temp_tables_manager.cpp +++ b/ydb/core/kqp/session_actor/kqp_temp_tables_manager.cpp @@ -39,9 +39,10 @@ class TKqpTempTablesManager : public TActorBootstrapped { return NKikimrServices::TActivity::KQP_SESSION_ACTOR; } - TKqpTempTablesManager(TKqpTempTablesState tempTablesState, const TActorId& target) + TKqpTempTablesManager(TKqpTempTablesState tempTablesState, const TActorId& target, const TString& database) : TempTablesState(std::move(tempTablesState)) , Target(target) + , Database(database) {} void Bootstrap() { @@ -51,7 +52,7 @@ class TKqpTempTablesManager : public TActorBootstrapped { auto ev = MakeHolder(); auto& record = ev->Record; - record.SetDatabaseName(info.Database); + record.SetDatabaseName(Database); if (info.UserToken) { record.SetUserToken(info.UserToken->GetSerializedToken()); } @@ -60,9 +61,7 @@ class TKqpTempTablesManager : public TActorBootstrapped { modifyScheme->SetWorkingDir(info.WorkingDir); modifyScheme->SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpDropTable); auto* drop = modifyScheme->MutableDrop(); - if (TempTablesState.SessionId) { - drop->SetName(info.Name + *TempTablesState.SessionId); - } + drop->SetName(info.Name + TempTablesState.SessionId); auto promise = NewPromise(); IActor* requestHandler = new TSchemeOpRequestHandler(ev.Release(), promise, true); @@ -107,14 +106,15 @@ class TKqpTempTablesManager : public TActorBootstrapped { private: TKqpTempTablesState TempTablesState; const TActorId Target; + const TString Database; ui32 ResultsCount = 0; }; } // namespace -IActor* CreateKqpTempTablesManager(TKqpTempTablesState tempTablesState, const TActorId& target) +IActor* CreateKqpTempTablesManager(TKqpTempTablesState tempTablesState, const TActorId& target, const TString& database) { - return new TKqpTempTablesManager(tempTablesState, target); + return new TKqpTempTablesManager(tempTablesState, target, database); } } // namespace NKikimr::NKqp From 214ec96febfcc56dca1f3273977b4ddabcb262d8 Mon Sep 17 00:00:00 2001 From: Nikolay Shumkov Date: Wed, 24 Jan 2024 04:57:25 +0300 Subject: [PATCH 08/10] Fixes --- ydb/core/kqp/provider/yql_kikimr_provider.cpp | 40 +++++++++++-------- ydb/core/kqp/provider/yql_kikimr_provider.h | 8 ++-- .../kqp/session_actor/kqp_session_actor.cpp | 2 +- 3 files changed, 30 insertions(+), 20 deletions(-) diff --git a/ydb/core/kqp/provider/yql_kikimr_provider.cpp b/ydb/core/kqp/provider/yql_kikimr_provider.cpp index 7ed4e07c7235..28965c15b825 100644 --- a/ydb/core/kqp/provider/yql_kikimr_provider.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_provider.cpp @@ -121,11 +121,13 @@ struct TKikimrData { const TKikimrTableDescription* TKikimrTablesData::EnsureTableExists(const TString& cluster, const TString& table, TPositionHandle pos, TExprContext& ctx) const { - auto tempTableInfoIt = TempTablesState->FindInfo(table, true); - auto tablePath = table; - if (tempTableInfoIt != TempTablesState->TempTables.end()) { - tablePath = tempTableInfoIt->first; + if (TempTablesState) { + auto tempTableInfoIt = TempTablesState->FindInfo(table, true); + + if (tempTableInfoIt != TempTablesState->TempTables.end()) { + tablePath = tempTableInfoIt->first; + } } auto desc = Tables.FindPtr(std::make_pair(cluster, tablePath)); @@ -141,11 +143,13 @@ const TKikimrTableDescription* TKikimrTablesData::EnsureTableExists(const TStrin } TKikimrTableDescription& TKikimrTablesData::GetOrAddTable(const TString& cluster, const TString& database, const TString& table, ETableType tableType) { - auto tempTableInfoIt = TempTablesState->FindInfo(table, true); - auto tablePath = table; - if (tempTableInfoIt != TempTablesState->TempTables.end()) { - tablePath = tempTableInfoIt->first; + if (TempTablesState) { + auto tempTableInfoIt = TempTablesState->FindInfo(table, true); + + if (tempTableInfoIt != TempTablesState->TempTables.end()) { + tablePath = tempTableInfoIt->first; + } } if (!Tables.FindPtr(std::make_pair(cluster, tablePath))) { @@ -165,11 +169,13 @@ TKikimrTableDescription& TKikimrTablesData::GetOrAddTable(const TString& cluster } TKikimrTableDescription& TKikimrTablesData::GetTable(const TString& cluster, const TString& table) { - auto tempTableInfoIt = TempTablesState->FindInfo(table, true); - auto tablePath = table; - if (tempTableInfoIt != TempTablesState->TempTables.end()) { - tablePath = tempTableInfoIt->first; + if (TempTablesState) { + auto tempTableInfoIt = TempTablesState->FindInfo(table, true); + + if (tempTableInfoIt != TempTablesState->TempTables.end()) { + tablePath = tempTableInfoIt->first; + } } auto desc = Tables.FindPtr(std::make_pair(cluster, tablePath)); @@ -181,11 +187,13 @@ TKikimrTableDescription& TKikimrTablesData::GetTable(const TString& cluster, con const TKikimrTableDescription& TKikimrTablesData::ExistingTable(const TStringBuf& cluster, const TStringBuf& table) const { - auto tempTableInfoIt = TempTablesState->FindInfo(table, true); - auto tablePath = table; - if (tempTableInfoIt != TempTablesState->TempTables.end()) { - tablePath = tempTableInfoIt->first; + if (TempTablesState) { + auto tempTableInfoIt = TempTablesState->FindInfo(table, true); + + if (tempTableInfoIt != TempTablesState->TempTables.end()) { + tablePath = tempTableInfoIt->first; + } } auto desc = Tables.FindPtr(std::make_pair(TString(cluster), TString(tablePath))); diff --git a/ydb/core/kqp/provider/yql_kikimr_provider.h b/ydb/core/kqp/provider/yql_kikimr_provider.h index abc98f548e8a..ecb0153a3f29 100644 --- a/ydb/core/kqp/provider/yql_kikimr_provider.h +++ b/ydb/core/kqp/provider/yql_kikimr_provider.h @@ -336,10 +336,12 @@ class TKikimrTransactionContextBase : public TThrRefBase { auto newOp = TYdbOperation(op.GetOperation()); TPosition pos(op.GetPosition().GetColumn(), op.GetPosition().GetRow()); - auto tempTableInfoIt = TempTablesState->FindInfo(table, false); + if (TempTablesState) { + auto tempTableInfoIt = TempTablesState->FindInfo(table, false); - if (tempTableInfoIt != TempTablesState->TempTables.end()) { - table = tempTableInfoIt->first + TempTablesState->SessionId; + if (tempTableInfoIt != TempTablesState->TempTables.end()) { + table = tempTableInfoIt->first + TempTablesState->SessionId; + } } const auto info = tableInfoMap.FindPtr(table); diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 0a1c9eb0145e..1ff33679fd32 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -1178,7 +1178,7 @@ class TKqpSessionActor : public TActorBootstrapped { return {{true, {JoinPath({path.first, path.second}), {path.second, path.first, userToken}}}}; } - auto it = TempTablesState.FindInfo(JoinPath({path.first, path.second})); + auto it = TempTablesState.FindInfo(JoinPath({path.first, path.second}), true); if (it == TempTablesState.TempTables.end()) { return std::nullopt; } From c088e0fad44958c48710e6ec14b9d7fb8c345e75 Mon Sep 17 00:00:00 2001 From: Nikolay Shumkov Date: Wed, 24 Jan 2024 16:40:07 +0300 Subject: [PATCH 09/10] Fixes --- ydb/core/kqp/provider/yql_kikimr_provider.h | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/ydb/core/kqp/provider/yql_kikimr_provider.h b/ydb/core/kqp/provider/yql_kikimr_provider.h index ecb0153a3f29..facb3a557d77 100644 --- a/ydb/core/kqp/provider/yql_kikimr_provider.h +++ b/ydb/core/kqp/provider/yql_kikimr_provider.h @@ -333,17 +333,6 @@ class TKikimrTransactionContextBase : public TThrRefBase { const auto newOp = TYdbOperation(op.GetOperation()); - auto newOp = TYdbOperation(op.GetOperation()); - TPosition pos(op.GetPosition().GetColumn(), op.GetPosition().GetRow()); - - if (TempTablesState) { - auto tempTableInfoIt = TempTablesState->FindInfo(table, false); - - if (tempTableInfoIt != TempTablesState->TempTables.end()) { - table = tempTableInfoIt->first + TempTablesState->SessionId; - } - } - const auto info = tableInfoMap.FindPtr(table); if (!info) { TString message = TStringBuilder() From 021801db97e21e244ab53e1dfefc901149b60c80 Mon Sep 17 00:00:00 2001 From: Nikolay Shumkov Date: Wed, 24 Jan 2024 17:53:16 +0300 Subject: [PATCH 10/10] Fixes --- ydb/core/kqp/provider/yql_kikimr_provider.h | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/ydb/core/kqp/provider/yql_kikimr_provider.h b/ydb/core/kqp/provider/yql_kikimr_provider.h index facb3a557d77..abcabe0c416e 100644 --- a/ydb/core/kqp/provider/yql_kikimr_provider.h +++ b/ydb/core/kqp/provider/yql_kikimr_provider.h @@ -322,16 +322,15 @@ class TKikimrTransactionContextBase : public TThrRefBase { } for (const auto& op : operations) { - const auto& table = [&]() -> const TString& { - auto tempTableInfoIt = TempTablesState->FindInfo(op.GetTable(), false); + const auto newOp = TYdbOperation(op.GetOperation()); + auto table = op.GetTable(); + if (TempTablesState) { + auto tempTableInfoIt = TempTablesState->FindInfo(table, false); if (tempTableInfoIt != TempTablesState->TempTables.end()) { - return tempTableInfoIt->first + TempTablesState->SessionId; + table = tempTableInfoIt->first + TempTablesState->SessionId; } - return op.GetTable(); - }(); - - const auto newOp = TYdbOperation(op.GetOperation()); + } const auto info = tableInfoMap.FindPtr(table); if (!info) {