diff --git a/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp index 953a306bd376..16b36386128b 100644 --- a/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp @@ -6,6 +6,7 @@ #include #include #include +#include namespace NKikimr::NKqp { @@ -49,7 +50,7 @@ class TKqpSchemeExecuter : public TActorBootstrapped { return NKikimrServices::TActivity::KQP_EXECUTER_ACTOR; } - TKqpSchemeExecuter(TKqpPhyTxHolder::TConstPtr phyTx, NKikimrKqp::EQueryType queryType, const TActorId& target, const TMaybe& requestType, + TKqpSchemeExecuter(TKqpPhyTxHolder::TConstPtr phyTx, NKikimrKqp::EQueryType queryType, const TActorId& target, const TMaybe& requestType, const TString& database, TIntrusiveConstPtr userToken, bool temporary, TString sessionId, TIntrusivePtr ctx) : PhyTx(phyTx) @@ -69,13 +70,11 @@ class TKqpSchemeExecuter : public TActorBootstrapped { } void StartBuildOperation() { - const auto& schemeOp = PhyTx->GetSchemeOperation(); - auto buildOp = schemeOp.GetBuildOperation(); Send(MakeTxProxyID(), new TEvTxUserProxy::TEvAllocateTxId); - Become(&TKqpSchemeExecuter::ExecuteState); + Become(&TKqpSchemeExecuter::ExecuteState); } - void Bootstrap() { + void MakeSchemeOperationRequest() { using TRequest = TEvTxUserProxy::TEvProposeTransaction; auto ev = MakeHolder(); @@ -124,30 +123,44 @@ class TKqpSchemeExecuter : public TActorBootstrapped { } case NKqpProto::TKqpSchemeOperation::kAlterTable: { - auto modifyScheme = schemeOp.GetAlterTable(); + const auto& modifyScheme = schemeOp.GetAlterTable(); ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme); break; } case NKqpProto::TKqpSchemeOperation::kBuildOperation: { - auto buildOp = schemeOp.GetBuildOperation(); return StartBuildOperation(); } case NKqpProto::TKqpSchemeOperation::kCreateUser: { - auto modifyScheme = schemeOp.GetCreateUser(); + const auto& modifyScheme = schemeOp.GetCreateUser(); ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme); break; } case NKqpProto::TKqpSchemeOperation::kAlterUser: { - auto modifyScheme = schemeOp.GetAlterUser(); + const auto& modifyScheme = schemeOp.GetAlterUser(); ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme); break; } case NKqpProto::TKqpSchemeOperation::kDropUser: { - auto modifyScheme = schemeOp.GetDropUser(); + const auto& modifyScheme = schemeOp.GetDropUser(); + ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme); + break; + } + case NKqpProto::TKqpSchemeOperation::kCreateExternalTable: { + const auto& modifyScheme = schemeOp.GetCreateExternalTable(); + ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme); + break; + } + case NKqpProto::TKqpSchemeOperation::kAlterExternalTable: { + const auto& modifyScheme = schemeOp.GetAlterExternalTable(); + ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme); + break; + } + case NKqpProto::TKqpSchemeOperation::kDropExternalTable: { + const auto& modifyScheme = schemeOp.GetDropExternalTable(); ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme); break; } @@ -191,7 +204,7 @@ class TKqpSchemeExecuter : public TActorBootstrapped { auto promise = NewPromise(); bool successOnNotExist = false; - bool failedOnAlreadyExists = false; + bool failedOnAlreadyExists = false; // exists/not exists semantics supported only in the query service. if (IsQueryService()) { successOnNotExist = ev->Record.GetTransaction().GetModifyScheme().GetSuccessOnNotExist(); @@ -218,6 +231,57 @@ class TKqpSchemeExecuter : public TActorBootstrapped { Become(&TKqpSchemeExecuter::ExecuteState); } + void MakeObjectRequest() { + const auto& schemeOp = PhyTx->GetSchemeOperation(); + NMetadata::IClassBehaviour::TPtr cBehaviour(NMetadata::IClassBehaviour::TFactory::Construct(schemeOp.GetObjectType())); + if (!cBehaviour) { + InternalError(TStringBuilder() << "Unsupported object type: \"" << schemeOp.GetObjectType() << "\""); + return; + } + + if (!cBehaviour->GetOperationsManager()) { + InternalError(TStringBuilder() << "Object type \"" << schemeOp.GetObjectType() << "\" does not have manager for operations"); + } + + auto* actorSystem = TActivationContext::ActorSystem(); + auto selfId = SelfId(); + + NMetadata::NModifications::IOperationsManager::TExternalModificationContext context; + context.SetDatabase(Database); + context.SetActorSystem(actorSystem); + if (UserToken) { + context.SetUserToken(*UserToken); + } + + auto resultFuture = cBehaviour->GetOperationsManager()->ExecutePrepared(schemeOp, cBehaviour, context); + + using TResultFuture = NThreading::TFuture; + resultFuture.Subscribe([actorSystem, selfId](const TResultFuture& f) { + const auto& status = f.GetValue(); + auto ev = MakeHolder(); + if (status.Ok()) { + ev->Result.SetSuccess(); + } else { + ev->Result.SetStatus(status.GetStatus()); + if (TString message = status.GetErrorMessage()) { + ev->Result.AddIssue(NYql::TIssue{message}); + } + } + actorSystem->Send(selfId, ev.Release()); + }); + + Become(&TKqpSchemeExecuter::ObjectExecuteState); + } + + void Bootstrap() { + const auto& schemeOp = PhyTx->GetSchemeOperation(); + if (schemeOp.GetObjectType()) { + MakeObjectRequest(); + } else { + MakeSchemeOperationRequest(); + } + } + public: STATEFN(ExecuteState) { try { @@ -240,6 +304,19 @@ class TKqpSchemeExecuter : public TActorBootstrapped { } } + STATEFN(ObjectExecuteState) { + try { + switch (ev->GetTypeRewrite()) { + hFunc(TEvPrivate::TEvResult, HandleExecute); + hFunc(TEvKqp::TEvAbortExecution, HandleAbortExecution); + default: + UnexpectedEvent("ObjectExecuteState", ev->GetTypeRewrite()); + } + } catch (const yexception& e) { + InternalError(e.what()); + } + } + void Handle(TEvTxUserProxy::TEvAllocateTxIdResult::TPtr& ev) { const auto* msg = ev->Get(); @@ -250,7 +327,7 @@ class TKqpSchemeExecuter : public TActorBootstrapped { void Navigate(const TActorId& schemeCache) { const auto& schemeOp = PhyTx->GetSchemeOperation(); - auto buildOp = schemeOp.GetBuildOperation(); + const auto& buildOp = schemeOp.GetBuildOperation(); const auto& path = buildOp.source_path(); const auto paths = NKikimr::SplitPath(path); @@ -258,10 +335,10 @@ class TKqpSchemeExecuter : public TActorBootstrapped { TString error = TStringBuilder() << "Failed to split table path " << path; return ReplyErrorAndDie(Ydb::StatusIds::BAD_REQUEST, NYql::TIssue(error)); } - + auto request = std::make_unique(); - request->DatabaseName = Database; + request->DatabaseName = Database; auto& entry = request->ResultSet.emplace_back(); entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpPath; entry.Path = ::NKikimr::SplitPath(path); @@ -312,7 +389,7 @@ class TKqpSchemeExecuter : public TActorBootstrapped { } const auto& schemeOp = PhyTx->GetSchemeOperation(); - auto buildOp = schemeOp.GetBuildOperation(); + const auto& buildOp = schemeOp.GetBuildOperation(); SetSchemeShardId(domainInfo->ExtractSchemeShard()); auto req = std::make_unique(TxId, Database, buildOp); ForwardToSchemeShard(std::move(req)); diff --git a/ydb/core/kqp/executer_actor/ut/kqp_executer_ut.cpp b/ydb/core/kqp/executer_actor/ut/kqp_executer_ut.cpp index 7748e7411a76..989d2b3587e6 100644 --- a/ydb/core/kqp/executer_actor/ut/kqp_executer_ut.cpp +++ b/ydb/core/kqp/executer_actor/ut/kqp_executer_ut.cpp @@ -20,7 +20,7 @@ namespace { [[maybe_unused]] NKqpProto::TKqpPhyTx BuildTxPlan(const TString& sql, TIntrusivePtr gateway, - const TKikimrConfiguration::TPtr& config) + const TKikimrConfiguration::TPtr& config, NActors::TActorId* actorSystem) { auto cluster = TString(DefaultKikimrClusterName); @@ -28,7 +28,7 @@ NKqpProto::TKqpPhyTx BuildTxPlan(const TString& sql, TIntrusivePtr IModuleResolver::TPtr moduleResolver; UNIT_ASSERT(GetYqlDefaultModuleResolver(moduleCtx, moduleResolver)); - auto qp = CreateKqpHost(gateway, cluster, "/Root", config, moduleResolver, NYql::IHTTPGateway::Make()); + auto qp = CreateKqpHost(gateway, cluster, "/Root", config, moduleResolver, NYql::IHTTPGateway::Make(), nullptr, false, false, nullptr, actorSystem); auto result = qp->SyncPrepareDataQuery(sql, IKqpHost::TPrepareSettings()); result.Issues().PrintTo(Cerr); UNIT_ASSERT(result.Success()); @@ -97,7 +97,7 @@ Y_UNIT_TEST_SUITE(KqpExecuter) { UPSERT INTO [Root/EightShard] SELECT * FROM $itemsSource; - )", gateway, ctx); + )", gateway, ctx, kikimr.GetTestServer().GetRuntime()->GetAnyNodeActorSystem()); LogTxPlan(kikimr, tx); diff --git a/ydb/core/kqp/executer_actor/ya.make b/ydb/core/kqp/executer_actor/ya.make index 9747b1a6e9b2..7a7cee3e675e 100644 --- a/ydb/core/kqp/executer_actor/ya.make +++ b/ydb/core/kqp/executer_actor/ya.make @@ -33,6 +33,7 @@ PEERDIR( ydb/core/protos ydb/core/tx/long_tx_service/public ydb/core/ydb_convert + ydb/services/metadata/abstract ydb/library/mkql_proto ydb/library/mkql_proto/protos ydb/library/yql/dq/actors/compute diff --git a/ydb/core/kqp/gateway/behaviour/external_data_source/manager.cpp b/ydb/core/kqp/gateway/behaviour/external_data_source/manager.cpp index 1d30d534de0b..983562ee5d68 100644 --- a/ydb/core/kqp/gateway/behaviour/external_data_source/manager.cpp +++ b/ydb/core/kqp/gateway/behaviour/external_data_source/manager.cpp @@ -3,10 +3,10 @@ #include #include +#include #include #include #include -#include #include @@ -19,6 +19,17 @@ TString GetOrEmpty(const NYql::TCreateObjectSettings& container, const TString& return fValue ? *fValue : TString{}; } +void CheckFeatureFlag(TExternalDataSourceManager::TInternalModificationContext& context) { + auto* actorSystem = context.GetExternalData().GetActorSystem(); + if (!actorSystem) { + ythrow yexception() << "This place needs an actor system. Please contact internal support"; + } + + if (!AppData(actorSystem)->FeatureFlags.GetEnableExternalDataSources()) { + throw std::runtime_error("External data sources are disabled. Please contact your system administrator to enable it"); + } +} + void FillCreateExternalDataSourceDesc(NKikimrSchemeOp::TExternalDataSourceDescription& externaDataSourceDesc, const TString& name, const NYql::TCreateObjectSettings& settings) { @@ -72,6 +83,44 @@ void FillCreateExternalDataSourceDesc(NKikimrSchemeOp::TExternalDataSourceDescri } } +void FillCreateExternalDataSourceCommand(NKikimrSchemeOp::TModifyScheme& modifyScheme, const NYql::TObjectSettingsImpl& settings, + TExternalDataSourceManager::TInternalModificationContext& context) { + CheckFeatureFlag(context); + + std::pair pathPair; + { + TString error; + if (!TrySplitPathByDb(settings.GetObjectId(), context.GetExternalData().GetDatabase(), pathPair, error)) { + throw std::runtime_error(error.c_str()); + } + } + + modifyScheme.SetWorkingDir(pathPair.first); + modifyScheme.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateExternalDataSource); + + NKikimrSchemeOp::TExternalDataSourceDescription& dataSourceDesc = *modifyScheme.MutableCreateExternalDataSource(); + FillCreateExternalDataSourceDesc(dataSourceDesc, pathPair.second, settings); +} + +void FillDropExternalDataSourceCommand(NKikimrSchemeOp::TModifyScheme& modifyScheme, const NYql::TObjectSettingsImpl& settings, + TExternalDataSourceManager::TInternalModificationContext& context) { + CheckFeatureFlag(context); + + std::pair pathPair; + { + TString error; + if (!NSchemeHelpers::TrySplitTablePath(settings.GetObjectId(), pathPair, error)) { + throw std::runtime_error(error.c_str()); + } + } + + modifyScheme.SetWorkingDir(pathPair.first); + modifyScheme.SetOperationType(NKikimrSchemeOp::ESchemeOpDropExternalDataSource); + + NKikimrSchemeOp::TDrop& drop = *modifyScheme.MutableDrop(); + drop.SetName(pathPair.second); +} + NThreading::TFuture SendSchemeRequest(TEvTxUserProxy::TEvProposeTransaction* request, TActorSystem* actorSystem, bool failedOnAlreadyExists = false) { auto promiseScheme = NThreading::NewPromise(); @@ -91,9 +140,10 @@ NThreading::TFuture SendScheme NThreading::TFuture TExternalDataSourceManager::DoModify(const NYql::TObjectSettingsImpl& settings, const ui32 nodeId, - NMetadata::IClassBehaviour::TPtr manager, + const NMetadata::IClassBehaviour::TPtr& manager, TInternalModificationContext& context) const { - Y_UNUSED(nodeId, manager, settings); + Y_UNUSED(nodeId, manager, settings); + try { switch (context.GetActivityType()) { case EActivityType::Upsert: case EActivityType::Undefined: @@ -104,85 +154,105 @@ NThreading::TFuture TExternalD case EActivityType::Drop: return DropExternalDataSource(settings, context); } + } catch (...) { + return NThreading::MakeFuture(TYqlConclusionStatus::Fail(CurrentExceptionMessage())); + } } NThreading::TFuture TExternalDataSourceManager::CreateExternalDataSource(const NYql::TObjectSettingsImpl& settings, TInternalModificationContext& context) const { using TRequest = TEvTxUserProxy::TEvProposeTransaction; - try { - auto* actorSystem = context.GetExternalData().GetActorSystem(); - if (!actorSystem) { - return NThreading::MakeFuture(TYqlConclusionStatus::Fail("This place needs an actor system. Please contact internal support")); - } - - if (!AppData(actorSystem)->FeatureFlags.GetEnableExternalDataSources()) { - return NThreading::MakeFuture(TYqlConclusionStatus::Fail("External data sources are disabled. Please contact your system administrator to enable it")); - } - - std::pair pathPair; - { - TString error; - if (!TrySplitPathByDb(settings.GetObjectId(), context.GetExternalData().GetDatabase(), pathPair, error)) { - return NThreading::MakeFuture(TYqlConclusionStatus::Fail(error)); - } - } + auto ev = MakeHolder(); + ev->Record.SetDatabaseName(context.GetExternalData().GetDatabase()); + if (context.GetExternalData().GetUserToken()) { + ev->Record.SetUserToken(context.GetExternalData().GetUserToken()->GetSerializedToken()); + } - auto ev = MakeHolder(); - ev->Record.SetDatabaseName(context.GetExternalData().GetDatabase()); - if (context.GetExternalData().GetUserToken()) { - ev->Record.SetUserToken(context.GetExternalData().GetUserToken()->GetSerializedToken()); - } - auto& schemeTx = *ev->Record.MutableTransaction()->MutableModifyScheme(); - schemeTx.SetWorkingDir(pathPair.first); - schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateExternalDataSource); + auto& schemeTx = *ev->Record.MutableTransaction()->MutableModifyScheme(); + FillCreateExternalDataSourceCommand(schemeTx, settings, context); - NKikimrSchemeOp::TExternalDataSourceDescription& dataSourceDesc = *schemeTx.MutableCreateExternalDataSource(); - FillCreateExternalDataSourceDesc(dataSourceDesc, pathPair.second, settings); - return SendSchemeRequest(ev.Release(), actorSystem, true); - } catch (...) { - return NThreading::MakeFuture(TYqlConclusionStatus::Fail(CurrentExceptionMessage())); - } + return SendSchemeRequest(ev.Release(), context.GetExternalData().GetActorSystem(), true); } NThreading::TFuture TExternalDataSourceManager::DropExternalDataSource(const NYql::TObjectSettingsImpl& settings, TInternalModificationContext& context) const { using TRequest = TEvTxUserProxy::TEvProposeTransaction; + auto ev = MakeHolder(); + ev->Record.SetDatabaseName(context.GetExternalData().GetDatabase()); + if (context.GetExternalData().GetUserToken()) { + ev->Record.SetUserToken(context.GetExternalData().GetUserToken()->GetSerializedToken()); + } + + auto& schemeTx = *ev->Record.MutableTransaction()->MutableModifyScheme(); + FillDropExternalDataSourceCommand(schemeTx, settings, context); + + return SendSchemeRequest(ev.Release(), context.GetExternalData().GetActorSystem()); +} + +TExternalDataSourceManager::TYqlConclusionStatus TExternalDataSourceManager::DoPrepare(NKqpProto::TKqpSchemeOperation& schemeOperation, const NYql::TObjectSettingsImpl& settings, + const NMetadata::IClassBehaviour::TPtr& manager, NMetadata::NModifications::IOperationsManager::TInternalModificationContext& context) const { + Y_UNUSED(manager); + try { - auto* actorSystem = context.GetExternalData().GetActorSystem(); - if (!actorSystem) { - return NThreading::MakeFuture(TYqlConclusionStatus::Fail("This place needs an actor system. Please contact internal support")); + switch (context.GetActivityType()) { + case EActivityType::Undefined: + return TYqlConclusionStatus::Fail("Undefined operation for EXTERNAL_DATA_SOURCE object"); + case EActivityType::Upsert: + return TYqlConclusionStatus::Fail("Upsert operation for EXTERNAL_DATA_SOURCE objects is not implemented"); + case EActivityType::Alter: + return TYqlConclusionStatus::Fail("Alter operation for EXTERNAL_DATA_SOURCE objects is not implemented"); + case EActivityType::Create: + PrepareCreateExternalDataSource(schemeOperation, settings, context); + break; + case EActivityType::Drop: + PrepareDropExternalDataSource(schemeOperation, settings, context); + break; } + return TYqlConclusionStatus::Success(); + } catch (...) { + return TYqlConclusionStatus::Fail(CurrentExceptionMessage()); + } +} - if (!AppData(actorSystem)->FeatureFlags.GetEnableExternalDataSources()) { - return NThreading::MakeFuture(TYqlConclusionStatus::Fail("External data sources are disabled. Please contact your system administrator to enable it")); - } +void TExternalDataSourceManager::PrepareCreateExternalDataSource(NKqpProto::TKqpSchemeOperation& schemeOperation, const NYql::TObjectSettingsImpl& settings, + TInternalModificationContext& context) const { + FillCreateExternalDataSourceCommand(*schemeOperation.MutableCreateExternalDataSource(), settings, context); +} - std::pair pathPair; - { - TString error; - if (!NYql::IKikimrGateway::TrySplitTablePath(settings.GetObjectId(), pathPair, error)) { - return NThreading::MakeFuture(TYqlConclusionStatus::Fail(error)); - } - } +void TExternalDataSourceManager::PrepareDropExternalDataSource(NKqpProto::TKqpSchemeOperation& schemeOperation, const NYql::TObjectSettingsImpl& settings, + TInternalModificationContext& context) const { + FillDropExternalDataSourceCommand(*schemeOperation.MutableDropExternalDataSource(), settings, context); +} - auto ev = MakeHolder(); - ev->Record.SetDatabaseName(context.GetExternalData().GetDatabase()); - if (context.GetExternalData().GetUserToken()) { - ev->Record.SetUserToken(context.GetExternalData().GetUserToken()->GetSerializedToken()); - } - auto& schemeTx = *ev->Record.MutableTransaction()->MutableModifyScheme(); - schemeTx.SetWorkingDir(pathPair.first); - schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpDropExternalDataSource); +NThreading::TFuture TExternalDataSourceManager::ExecutePrepared(const NKqpProto::TKqpSchemeOperation& schemeOperation, + const NMetadata::IClassBehaviour::TPtr& /*manager*/, const IOperationsManager::TExternalModificationContext& context) const { + using TRequest = TEvTxUserProxy::TEvProposeTransaction; - NKikimrSchemeOp::TDrop& drop = *schemeTx.MutableDrop(); - drop.SetName(pathPair.second); - return SendSchemeRequest(ev.Release(), actorSystem); + auto ev = MakeHolder(); + ev->Record.SetDatabaseName(context.GetDatabase()); + if (context.GetUserToken()) { + ev->Record.SetUserToken(context.GetUserToken()->GetSerializedToken()); } - catch (...) { - return NThreading::MakeFuture(TYqlConclusionStatus::Fail(CurrentExceptionMessage())); + + auto& schemeTx = *ev->Record.MutableTransaction()->MutableModifyScheme(); + switch (schemeOperation.GetOperationCase()) { + case NKqpProto::TKqpSchemeOperation::kCreateExternalDataSource: + schemeTx.CopyFrom(schemeOperation.GetCreateExternalDataSource()); + break; + case NKqpProto::TKqpSchemeOperation::kAlterExternalDataSource: + schemeTx.CopyFrom(schemeOperation.GetAlterExternalDataSource()); + break; + case NKqpProto::TKqpSchemeOperation::kDropExternalDataSource: + schemeTx.CopyFrom(schemeOperation.GetDropExternalDataSource()); + break; + default: + return NThreading::MakeFuture(NMetadata::NModifications::IOperationsManager::TYqlConclusionStatus::Fail( + TStringBuilder() << "Execution of prepare operation for EXTERNAL_DATA_SOURCE object: unsupported operation: " << int(schemeOperation.GetOperationCase()))); } + + return SendSchemeRequest(ev.Release(), context.GetActorSystem(), true); } } diff --git a/ydb/core/kqp/gateway/behaviour/external_data_source/manager.h b/ydb/core/kqp/gateway/behaviour/external_data_source/manager.h index fb1045e203e1..3007bab19c30 100644 --- a/ydb/core/kqp/gateway/behaviour/external_data_source/manager.h +++ b/ydb/core/kqp/gateway/behaviour/external_data_source/manager.h @@ -11,12 +11,24 @@ class TExternalDataSourceManager: public NMetadata::NModifications::IOperationsM NThreading::TFuture DropExternalDataSource(const NYql::TObjectSettingsImpl& settings, TInternalModificationContext& context) const; + void PrepareCreateExternalDataSource(NKqpProto::TKqpSchemeOperation& schemeOperation, const NYql::TObjectSettingsImpl& settings, + TInternalModificationContext& context) const; + + void PrepareDropExternalDataSource(NKqpProto::TKqpSchemeOperation& schemeOperation, const NYql::TObjectSettingsImpl& settings, + TInternalModificationContext& context) const; + protected: NThreading::TFuture DoModify(const NYql::TObjectSettingsImpl& settings, const ui32 nodeId, - NMetadata::IClassBehaviour::TPtr manager, + const NMetadata::IClassBehaviour::TPtr& manager, TInternalModificationContext& context) const override; + IOperationsManager::TYqlConclusionStatus DoPrepare(NKqpProto::TKqpSchemeOperation& schemeOperation, const NYql::TObjectSettingsImpl& settings, + const NMetadata::IClassBehaviour::TPtr& manager, IOperationsManager::TInternalModificationContext& context) const override; + + NThreading::TFuture ExecutePrepared(const NKqpProto::TKqpSchemeOperation& schemeOperation, + const NMetadata::IClassBehaviour::TPtr& manager, const IOperationsManager::TExternalModificationContext& context) const override; + public: using NMetadata::NModifications::IOperationsManager::TYqlConclusionStatus; }; diff --git a/ydb/core/kqp/gateway/behaviour/external_data_source/ya.make b/ydb/core/kqp/gateway/behaviour/external_data_source/ya.make index 2b52c19cc94c..e7ed0ff336e9 100644 --- a/ydb/core/kqp/gateway/behaviour/external_data_source/ya.make +++ b/ydb/core/kqp/gateway/behaviour/external_data_source/ya.make @@ -9,6 +9,7 @@ PEERDIR( ydb/services/metadata/initializer ydb/services/metadata/abstract ydb/core/kqp/gateway/actors + ydb/core/kqp/gateway/utils ydb/core/kqp/gateway/behaviour/tablestore/operations ) diff --git a/ydb/core/kqp/gateway/behaviour/tablestore/manager.cpp b/ydb/core/kqp/gateway/behaviour/tablestore/manager.cpp index 1768938fb337..66d4fc26c974 100644 --- a/ydb/core/kqp/gateway/behaviour/tablestore/manager.cpp +++ b/ydb/core/kqp/gateway/behaviour/tablestore/manager.cpp @@ -12,63 +12,74 @@ namespace NKikimr::NKqp { NThreading::TFuture TTableStoreManager::DoModify(const NYql::TObjectSettingsImpl& settings, const ui32 nodeId, - NMetadata::IClassBehaviour::TPtr manager, TInternalModificationContext& context) const { - Y_UNUSED(nodeId); - Y_UNUSED(manager); - auto promise = NThreading::NewPromise(); - auto result = promise.GetFuture(); + const NMetadata::IClassBehaviour::TPtr& manager, TInternalModificationContext& context) const { + Y_UNUSED(nodeId); + Y_UNUSED(manager); + auto promise = NThreading::NewPromise(); + auto result = promise.GetFuture(); - auto* actorSystem = context.GetExternalData().GetActorSystem(); - if (!actorSystem) { - return NThreading::MakeFuture(TYqlConclusionStatus::Fail("This place needs an actor system. Please contact internal support")); - } + auto* actorSystem = context.GetExternalData().GetActorSystem(); + if (!actorSystem) { + return NThreading::MakeFuture(TYqlConclusionStatus::Fail("This place needs an actor system. Please contact internal support")); + } - switch (context.GetActivityType()) { - case EActivityType::Create: - case EActivityType::Upsert: - case EActivityType::Drop: - case EActivityType::Undefined: - return NThreading::MakeFuture(TYqlConclusionStatus::Fail("not implemented")); - case EActivityType::Alter: - try { - auto actionName = settings.GetFeaturesExtractor().Extract("ACTION"); - if (!actionName) { - return NThreading::MakeFuture(TYqlConclusionStatus::Fail("can't find ACTION parameter")); - } - ITableStoreOperation::TPtr operation(ITableStoreOperation::TFactory::Construct(*actionName)); - if (!operation) { - return NThreading::MakeFuture(TYqlConclusionStatus::Fail("invalid ACTION: " + *actionName)); - } - { - auto parsingResult = operation->Deserialize(settings); - if (!parsingResult) { - return NThreading::MakeFuture(TYqlConclusionStatus::Fail(parsingResult.GetErrorMessage())); - } - } - auto ev = MakeHolder(); - ev->Record.SetDatabaseName(context.GetExternalData().GetDatabase()); - if (context.GetExternalData().GetUserToken()) { - ev->Record.SetUserToken(context.GetExternalData().GetUserToken()->GetSerializedToken()); + switch (context.GetActivityType()) { + case EActivityType::Create: + case EActivityType::Upsert: + case EActivityType::Drop: + case EActivityType::Undefined: + return NThreading::MakeFuture(TYqlConclusionStatus::Fail("not implemented")); + case EActivityType::Alter: + try { + auto actionName = settings.GetFeaturesExtractor().Extract("ACTION"); + if (!actionName) { + return NThreading::MakeFuture(TYqlConclusionStatus::Fail("can't find ACTION parameter")); + } + ITableStoreOperation::TPtr operation(ITableStoreOperation::TFactory::Construct(*actionName)); + if (!operation) { + return NThreading::MakeFuture(TYqlConclusionStatus::Fail("invalid ACTION: " + *actionName)); + } + { + auto parsingResult = operation->Deserialize(settings); + if (!parsingResult) { + return NThreading::MakeFuture(TYqlConclusionStatus::Fail(parsingResult.GetErrorMessage())); } - auto& schemeTx = *ev->Record.MutableTransaction()->MutableModifyScheme(); - operation->SerializeScheme(schemeTx, IsStandalone); - - auto promiseScheme = NThreading::NewPromise(); - actorSystem->Register(new NKqp::TSchemeOpRequestHandler(ev.Release(), promiseScheme, false)); - return promiseScheme.GetFuture().Apply([](const NThreading::TFuture& f) { - if (f.HasValue() && !f.HasException() && f.GetValue().Success()) { - return TYqlConclusionStatus::Success(); - } else if (f.HasValue()) { - return TYqlConclusionStatus::Fail(f.GetValue().Status(), f.GetValue().Issues().ToString()); - } - return TYqlConclusionStatus::Fail("no value in result"); - }); - } catch (yexception& e) { - return NThreading::MakeFuture(TYqlConclusionStatus::Fail(e.what())); } + auto ev = MakeHolder(); + ev->Record.SetDatabaseName(context.GetExternalData().GetDatabase()); + if (context.GetExternalData().GetUserToken()) { + ev->Record.SetUserToken(context.GetExternalData().GetUserToken()->GetSerializedToken()); + } + auto& schemeTx = *ev->Record.MutableTransaction()->MutableModifyScheme(); + operation->SerializeScheme(schemeTx, IsStandalone); + + auto promiseScheme = NThreading::NewPromise(); + actorSystem->Register(new NKqp::TSchemeOpRequestHandler(ev.Release(), promiseScheme, false)); + return promiseScheme.GetFuture().Apply([](const NThreading::TFuture& f) { + if (f.HasValue() && !f.HasException() && f.GetValue().Success()) { + return TYqlConclusionStatus::Success(); + } else if (f.HasValue()) { + return TYqlConclusionStatus::Fail(f.GetValue().Status(), f.GetValue().Issues().ToString()); + } + return TYqlConclusionStatus::Fail("no value in result"); + }); + } catch (yexception& e) { + return NThreading::MakeFuture(TYqlConclusionStatus::Fail(e.what())); } - return result; + } + return result; } +NMetadata::NModifications::IOperationsManager::TYqlConclusionStatus TTableStoreManager::DoPrepare(NKqpProto::TKqpSchemeOperation& /*schemeOperation*/, const NYql::TObjectSettingsImpl& /*settings*/, + const NMetadata::IClassBehaviour::TPtr& /*manager*/, NMetadata::NModifications::IOperationsManager::TInternalModificationContext& /*context*/) const { + return NMetadata::NModifications::IOperationsManager::TYqlConclusionStatus::Fail( + "Prepare operations for TABLE objects are not supported"); } +NThreading::TFuture TTableStoreManager::ExecutePrepared(const NKqpProto::TKqpSchemeOperation& /*schemeOperation*/, + const NMetadata::IClassBehaviour::TPtr& /*manager*/, const IOperationsManager::TExternalModificationContext& /*context*/) const { + return NThreading::MakeFuture(NMetadata::NModifications::IOperationsManager::TYqlConclusionStatus::Fail( + "Execution of prepare operations for TABLE objects is not supported")); +} + +} diff --git a/ydb/core/kqp/gateway/behaviour/tablestore/manager.h b/ydb/core/kqp/gateway/behaviour/tablestore/manager.h index a4361e933762..58fd0e57f885 100644 --- a/ydb/core/kqp/gateway/behaviour/tablestore/manager.h +++ b/ydb/core/kqp/gateway/behaviour/tablestore/manager.h @@ -11,7 +11,13 @@ class TTableStoreManager: public NMetadata::NModifications::IOperationsManager { bool IsStandalone = false; protected: NThreading::TFuture DoModify(const NYql::TObjectSettingsImpl& settings, const ui32 nodeId, - NMetadata::IClassBehaviour::TPtr manager, TInternalModificationContext& context) const override; + const NMetadata::IClassBehaviour::TPtr& manager, TInternalModificationContext& context) const override; + + IOperationsManager::TYqlConclusionStatus DoPrepare(NKqpProto::TKqpSchemeOperation& schemeOperation, const NYql::TObjectSettingsImpl& settings, + const NMetadata::IClassBehaviour::TPtr& manager, IOperationsManager::TInternalModificationContext& context) const override; + + NThreading::TFuture ExecutePrepared(const NKqpProto::TKqpSchemeOperation& schemeOperation, + const NMetadata::IClassBehaviour::TPtr& manager, const IOperationsManager::TExternalModificationContext& context) const override; public: TTableStoreManager(bool isStandalone) : IsStandalone(isStandalone) diff --git a/ydb/core/kqp/gateway/behaviour/tablestore/operations/abstract.cpp b/ydb/core/kqp/gateway/behaviour/tablestore/operations/abstract.cpp index 682c303abdd6..76f9eea6d403 100644 --- a/ydb/core/kqp/gateway/behaviour/tablestore/operations/abstract.cpp +++ b/ydb/core/kqp/gateway/behaviour/tablestore/operations/abstract.cpp @@ -1,4 +1,5 @@ #include "abstract.h" +#include #include namespace NKikimr::NKqp { @@ -7,7 +8,7 @@ TConclusionStatus ITableStoreOperation::Deserialize(const NYql::TObjectSettingsI std::pair pathPair; { TString error; - if (!NYql::IKikimrGateway::TrySplitTablePath(settings.GetObjectId(), pathPair, error)) { + if (!NSchemeHelpers::TrySplitTablePath(settings.GetObjectId(), pathPair, error)) { return TConclusionStatus::Fail(error); } WorkingDir = pathPair.first; diff --git a/ydb/core/kqp/gateway/behaviour/tablestore/operations/ya.make b/ydb/core/kqp/gateway/behaviour/tablestore/operations/ya.make index ef81ef50a879..5325ed5c49a3 100644 --- a/ydb/core/kqp/gateway/behaviour/tablestore/operations/ya.make +++ b/ydb/core/kqp/gateway/behaviour/tablestore/operations/ya.make @@ -10,6 +10,7 @@ SRCS( PEERDIR( ydb/services/metadata/manager ydb/core/formats/arrow/compression + ydb/core/kqp/gateway/utils ydb/core/protos ) diff --git a/ydb/core/kqp/gateway/behaviour/ya.make b/ydb/core/kqp/gateway/behaviour/ya.make new file mode 100644 index 000000000000..741203bd73c9 --- /dev/null +++ b/ydb/core/kqp/gateway/behaviour/ya.make @@ -0,0 +1,5 @@ +RECURSE( + external_data_source + table + tablestore +) diff --git a/ydb/core/kqp/gateway/kqp_gateway.h b/ydb/core/kqp/gateway/kqp_gateway.h index 3e4477bb7ef0..21e16c3a710d 100644 --- a/ydb/core/kqp/gateway/kqp_gateway.h +++ b/ydb/core/kqp/gateway/kqp_gateway.h @@ -207,12 +207,6 @@ TIntrusivePtr CreateKikimrIcGateway(const TString& cluster, NKikimr std::shared_ptr&& metadataLoader, NActors::TActorSystem* actorSystem, ui32 nodeId, TKqpRequestCounters::TPtr counters, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig = NKikimrConfig::TQueryServiceConfig()); -bool SplitTablePath(const TString& tableName, const TString& database, std::pair& pathPair, - TString& error, bool createDir); - -bool SetDatabaseForLoginOperation(TString& result, bool getDomainLoginOnly, TMaybe domainName, - const TString& database); - } // namespace NKikimr::NKqp template<> diff --git a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp index 015eb888bb3d..6c9cf1511db6 100644 --- a/ydb/core/kqp/gateway/kqp_ic_gateway.cpp +++ b/ydb/core/kqp/gateway/kqp_ic_gateway.cpp @@ -12,6 +12,7 @@ #include #include #include +#include #include #include #include @@ -653,7 +654,7 @@ class TKqpExecLiteralRequestHandler: public TActorBootstrapped TFuture InvalidCluster(const TString& cluster) { - return MakeFuture(ResultFromError("Invalid cluster:" + cluster)); + return MakeFuture(ResultFromError("Invalid cluster: " + cluster)); } void KqpResponseToQueryResult(const NKikimrKqp::TEvQueryResponse& response, IKqpGateway::TQueryResult& queryResult) { @@ -1212,7 +1213,7 @@ class TKikimrIcGateway : public IKqpGateway { schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateExternalTable); NKikimrSchemeOp::TExternalTableDescription& externalTableDesc = *schemeTx.MutableCreateExternalTable(); - FillCreateExternalTableColumnDesc(externalTableDesc, pathPair.second, settings); + NSchemeHelpers::FillCreateExternalTableColumnDesc(externalTableDesc, pathPair.second, settings); return SendSchemeRequest(ev.Release(), true); } catch (yexception& e) { @@ -2300,13 +2301,13 @@ class TKikimrIcGateway : public IKqpGateway { } bool GetDatabaseForLoginOperation(TString& database) { - return SetDatabaseForLoginOperation(database, GetDomainLoginOnly(), GetDomainName(), GetDatabase()); + return NSchemeHelpers::SetDatabaseForLoginOperation(database, GetDomainLoginOnly(), GetDomainName(), GetDatabase()); } bool GetPathPair(const TString& tableName, std::pair& pathPair, TString& error, bool createDir) { - return SplitTablePath(tableName, Database, pathPair, error, createDir); + return NSchemeHelpers::SplitTablePath(tableName, Database, pathPair, error, createDir); } private: @@ -2357,33 +2358,6 @@ class TKikimrIcGateway : public IKqpGateway { schema.SetEngine(NKikimrSchemeOp::EColumnTableEngine::COLUMN_ENGINE_REPLACING_TIMESERIES); } - static void FillCreateExternalTableColumnDesc(NKikimrSchemeOp::TExternalTableDescription& externalTableDesc, - const TString& name, - const NYql::TCreateExternalTableSettings& settings) - { - externalTableDesc.SetName(name); - externalTableDesc.SetDataSourcePath(settings.DataSourcePath); - externalTableDesc.SetLocation(settings.Location); - externalTableDesc.SetSourceType("General"); - - Y_ENSURE(settings.ColumnOrder.size() == settings.Columns.size()); - for (const auto& name : settings.ColumnOrder) { - auto columnIt = settings.Columns.find(name); - Y_ENSURE(columnIt != settings.Columns.end()); - - TColumnDescription& columnDesc = *externalTableDesc.AddColumns(); - columnDesc.SetName(columnIt->second.Name); - columnDesc.SetType(columnIt->second.Type); - columnDesc.SetNotNull(columnIt->second.NotNull); - } - NKikimrExternalSources::TGeneral general; - auto& attributes = *general.mutable_attributes(); - for (const auto& [key, value]: settings.SourceTypeParameters) { - attributes.insert({key, value}); - } - externalTableDesc.SetContent(general.SerializeAsString()); - } - static void FillParameters(TQueryData::TPtr params, ::google::protobuf::Map, Ydb::TypedValue>* output) { if (!params) { return; @@ -2469,26 +2443,5 @@ TIntrusivePtr CreateKikimrIcGateway(const TString& cluster, NKikimr counters, queryServiceConfig); } -bool SplitTablePath(const TString& tableName, const TString& database, std::pair& pathPair, - TString& error, bool createDir) -{ - if (createDir) { - return TrySplitPathByDb(tableName, database, pathPair, error); - } else { - return IKqpGateway::TrySplitTablePath(tableName, pathPair, error); - } -} - -bool SetDatabaseForLoginOperation(TString& result, bool getDomainLoginOnly, TMaybe domainName, - const TString& database) -{ - if (getDomainLoginOnly && !domainName) { - return false; - } - result = domainName ? "/" + *domainName : database; - return true; -} - - } // namespace NKqp } // namespace NKikimr diff --git a/ydb/core/kqp/gateway/kqp_metadata_loader.cpp b/ydb/core/kqp/gateway/kqp_metadata_loader.cpp index 7941573930ba..3f8f887e21c0 100644 --- a/ydb/core/kqp/gateway/kqp_metadata_loader.cpp +++ b/ydb/core/kqp/gateway/kqp_metadata_loader.cpp @@ -3,6 +3,7 @@ #include #include +#include #include #include @@ -545,7 +546,7 @@ NThreading::TFuture TKqpTableMetadataLoader::LoadIndexMeta for (size_t i = 0; i < indexesCount; i++) { const auto& index = tableMetadata->Indexes[i]; - auto indexTablePath = NYql::IKikimrGateway::CreateIndexTablePath(tableName, index.Name); + auto indexTablePath = NSchemeHelpers::CreateIndexTablePath(tableName, index.Name); if (!index.SchemaVersion) { LOG_DEBUG_S(*ActorSystem, NKikimrServices::KQP_GATEWAY, "Load index metadata without schema version check index: " << index.Name); diff --git a/ydb/core/kqp/gateway/utils/scheme_helpers.cpp b/ydb/core/kqp/gateway/utils/scheme_helpers.cpp new file mode 100644 index 000000000000..cf9a823d04de --- /dev/null +++ b/ydb/core/kqp/gateway/utils/scheme_helpers.cpp @@ -0,0 +1,89 @@ +#include "scheme_helpers.h" + +#include +#include + +namespace NKikimr::NKqp::NSchemeHelpers { + +using namespace NKikimrSchemeOp; +using namespace NKikimrExternalSources; + +TString CanonizePath(const TString& path) { + if (path.empty()) { + return "/"; + } + + if (path[0] != '/') { + return "/" + path; + } + + return path; +} + +bool TrySplitTablePath(const TString& path, std::pair& result, TString& error) { + auto parts = NKikimr::SplitPath(path); + + if (parts.size() < 2) { + error = TString("Missing scheme root in table path: ") + path; + return false; + } + + result = std::make_pair( + CombinePath(parts.begin(), parts.end() - 1), + parts.back()); + + return true; +} + +bool SplitTablePath(const TString& tableName, const TString& database, std::pair& pathPair, + TString& error, bool createDir) +{ + if (createDir) { + return TrySplitPathByDb(tableName, database, pathPair, error); + } else { + return TrySplitTablePath(tableName, pathPair, error); + } +} + +TString CreateIndexTablePath(const TString& tableName, const TString& indexName) { + return tableName + "/" + indexName + "/indexImplTable"; +} + +bool SetDatabaseForLoginOperation(TString& result, bool getDomainLoginOnly, TMaybe domainName, + const TString& database) +{ + if (getDomainLoginOnly && !domainName) { + return false; + } + result = domainName ? "/" + *domainName : database; + return true; +} + +void FillCreateExternalTableColumnDesc(NKikimrSchemeOp::TExternalTableDescription& externalTableDesc, + const TString& name, + const NYql::TCreateExternalTableSettings& settings) +{ + externalTableDesc.SetName(name); + externalTableDesc.SetDataSourcePath(settings.DataSourcePath); + externalTableDesc.SetLocation(settings.Location); + externalTableDesc.SetSourceType("General"); + + Y_ENSURE(settings.ColumnOrder.size() == settings.Columns.size()); + for (const auto& name : settings.ColumnOrder) { + auto columnIt = settings.Columns.find(name); + Y_ENSURE(columnIt != settings.Columns.end()); + + TColumnDescription& columnDesc = *externalTableDesc.AddColumns(); + columnDesc.SetName(columnIt->second.Name); + columnDesc.SetType(columnIt->second.Type); + columnDesc.SetNotNull(columnIt->second.NotNull); + } + NKikimrExternalSources::TGeneral general; + auto& attributes = *general.mutable_attributes(); + for (const auto& [key, value]: settings.SourceTypeParameters) { + attributes.insert({key, value}); + } + externalTableDesc.SetContent(general.SerializeAsString()); +} + +} // namespace NKikimr::NKqp::NSchemeHelpers diff --git a/ydb/core/kqp/gateway/utils/scheme_helpers.h b/ydb/core/kqp/gateway/utils/scheme_helpers.h new file mode 100644 index 000000000000..f3b15b1548b2 --- /dev/null +++ b/ydb/core/kqp/gateway/utils/scheme_helpers.h @@ -0,0 +1,34 @@ +#pragma once +#include +#include + +#include +#include + +namespace NKikimr::NKqp::NSchemeHelpers { + +TString CanonizePath(const TString& path); + +template +TString CombinePath(TIter begin, TIter end, bool canonize = true) { + auto path = JoinRange("/", begin, end); + return canonize + ? CanonizePath(path) + : path; +} + +bool TrySplitTablePath(const TString& path, std::pair& result, TString& error); + +bool SplitTablePath(const TString& tableName, const TString& database, std::pair& pathPair, + TString& error, bool createDir); + +TString CreateIndexTablePath(const TString& tableName, const TString& indexName); + +bool SetDatabaseForLoginOperation(TString& result, bool getDomainLoginOnly, TMaybe domainName, + const TString& database); + +void FillCreateExternalTableColumnDesc(NKikimrSchemeOp::TExternalTableDescription& externalTableDesc, + const TString& name, + const NYql::TCreateExternalTableSettings& settings); + +} // namespace NKikimr::NKqp::NSchemeHelpers diff --git a/ydb/core/kqp/gateway/utils/ya.make b/ydb/core/kqp/gateway/utils/ya.make new file mode 100644 index 000000000000..148e81220eeb --- /dev/null +++ b/ydb/core/kqp/gateway/utils/ya.make @@ -0,0 +1,15 @@ +LIBRARY() + +SRCS( + scheme_helpers.cpp +) + +PEERDIR( + ydb/core/base + ydb/core/kqp/provider + ydb/core/protos +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/ydb/core/kqp/gateway/ya.make b/ydb/core/kqp/gateway/ya.make index d75c1bbb0b26..0e23dc0ed8c3 100644 --- a/ydb/core/kqp/gateway/ya.make +++ b/ydb/core/kqp/gateway/ya.make @@ -17,9 +17,17 @@ PEERDIR( ydb/core/kqp/gateway/behaviour/tablestore ydb/core/kqp/gateway/behaviour/table ydb/core/kqp/gateway/behaviour/external_data_source + ydb/core/kqp/gateway/utils ydb/library/yql/providers/result/expr_nodes ) YQL_LAST_ABI_VERSION() END() + +RECURSE( + actors + behaviour + local_rpc + utils +) diff --git a/ydb/core/kqp/host/kqp_gateway_proxy.cpp b/ydb/core/kqp/host/kqp_gateway_proxy.cpp index 0f6d15d6a149..3c43b973c427 100644 --- a/ydb/core/kqp/host/kqp_gateway_proxy.cpp +++ b/ydb/core/kqp/host/kqp_gateway_proxy.cpp @@ -1,8 +1,10 @@ #include "kqp_host_impl.h" #include +#include #include #include +#include namespace NKikimr::NKqp { @@ -397,9 +399,11 @@ bool IsDdlPrepareAllowed(TKikimrSessionContext& sessionCtx) { class TKqpGatewayProxy : public IKikimrGateway { public: TKqpGatewayProxy(const TIntrusivePtr& gateway, - const TIntrusivePtr& sessionCtx) + const TIntrusivePtr& sessionCtx, + TActorSystem* actorSystem) : Gateway(gateway) , SessionCtx(sessionCtx) + , ActorSystem(actorSystem) { YQL_ENSURE(Gateway); } @@ -425,6 +429,10 @@ class TKqpGatewayProxy : public IKikimrGateway { Gateway->SetToken(cluster, token); } + bool GetDatabaseForLoginOperation(TString& database) { + return NSchemeHelpers::SetDatabaseForLoginOperation(database, GetDomainLoginOnly(), GetDomainName(), GetDatabase()); + } + TFuture ListPath(const TString& cluster, const TString& path) override { return Gateway->ListPath(cluster, path); } @@ -440,7 +448,7 @@ class TKqpGatewayProxy : public IKikimrGateway { std::pair pathPair; TString error; - if (!SplitTablePath(metadata->Name, GetDatabase(), pathPair, error, createDir)) { + if (!NSchemeHelpers::SplitTablePath(metadata->Name, GetDatabase(), pathPair, error, createDir)) { return MakeFuture(ResultFromError(error)); } @@ -640,7 +648,7 @@ class TKqpGatewayProxy : public IKikimrGateway { if (buildSettings.has_column_build_operation()) { buildSettings.MutableAlterMainTablePayload()->PackFrom(modifyScheme); - phyTx.MutableSchemeOperation()->MutableBuildOperation()->CopyFrom(buildSettings); + phyTx.MutableSchemeOperation()->MutableBuildOperation()->CopyFrom(buildSettings); } else { phyTx.MutableSchemeOperation()->MutableAlterTable()->CopyFrom(modifyScheme); } @@ -729,7 +737,7 @@ class TKqpGatewayProxy : public IKikimrGateway { std::pair pathPair; TString error; - if (!SplitTablePath(metadata->Name, GetDatabase(), pathPair, error, false)) { + if (!NSchemeHelpers::SplitTablePath(metadata->Name, GetDatabase(), pathPair, error, false)) { return MakeFuture(ResultFromError(error)); } @@ -793,7 +801,7 @@ class TKqpGatewayProxy : public IKikimrGateway { auto createUserPromise = NewPromise(); TString database; - if (!SetDatabaseForLoginOperation(database, GetDomainLoginOnly(), GetDomainName(), GetDatabase())) { + if (!GetDatabaseForLoginOperation(database)) { return MakeFuture(ResultFromError("Couldn't get domain name")); } @@ -829,7 +837,7 @@ class TKqpGatewayProxy : public IKikimrGateway { auto alterUserPromise = NewPromise(); TString database; - if (!SetDatabaseForLoginOperation(database, GetDomainLoginOnly(), GetDomainName(), GetDatabase())) { + if (!GetDatabaseForLoginOperation(database)) { return MakeFuture(ResultFromError("Couldn't get domain name")); } @@ -864,7 +872,7 @@ class TKqpGatewayProxy : public IKikimrGateway { auto dropUserPromise = NewPromise(); TString database; - if (!SetDatabaseForLoginOperation(database, GetDomainLoginOnly(), GetDomainName(), GetDatabase())) { + if (!GetDatabaseForLoginOperation(database)) { return MakeFuture(ResultFromError("Couldn't get domain name")); } @@ -891,20 +899,115 @@ class TKqpGatewayProxy : public IKikimrGateway { return dropUserPromise.GetFuture(); } + struct TRemoveLastPhyTxHelper { + TRemoveLastPhyTxHelper() = default; + + ~TRemoveLastPhyTxHelper() { + if (Query) { + Query->MutableTransactions()->RemoveLast(); + } + } + + NKqpProto::TKqpPhyTx& Capture(NKqpProto::TKqpPhyQuery* query) { + Query = query; + return *Query->AddTransactions(); + } + + void Forget() { + Query = nullptr; + } + private: + NKqpProto::TKqpPhyQuery* Query = nullptr; + }; + + template + TGenericResult PrepareObjectOperation(const TString& cluster, const TSettings& settings, + NMetadata::NModifications::IOperationsManager::TYqlConclusionStatus (NMetadata::NModifications::IOperationsManager::* prepareMethod)(NKqpProto::TKqpSchemeOperation&, const TSettings&, const NMetadata::IClassBehaviour::TPtr&, const NMetadata::NModifications::IOperationsManager::TExternalModificationContext&) const) + { + TRemoveLastPhyTxHelper phyTxRemover; + try { + if (cluster != SessionCtx->GetCluster()) { + return ResultFromError("Invalid cluster: " + cluster); + } + + TString database; + if (!GetDatabaseForLoginOperation(database)) { + return ResultFromError("Couldn't get domain name"); + } + + NMetadata::IClassBehaviour::TPtr cBehaviour(NMetadata::IClassBehaviour::TPtr(NMetadata::IClassBehaviour::TFactory::Construct(settings.GetTypeId()))); + if (!cBehaviour) { + return ResultFromError(TStringBuilder() << "Incorrect object type: \"" << settings.GetTypeId() << "\""); + } + + if (!cBehaviour->GetOperationsManager()) { + return ResultFromError(TStringBuilder() << "Object type \"" << settings.GetTypeId() << "\" does not have manager for operations"); + } + + NMetadata::NModifications::IOperationsManager::TExternalModificationContext context; + context.SetDatabase(SessionCtx->GetDatabase()); + context.SetActorSystem(ActorSystem); + + auto& phyTx = phyTxRemover.Capture(SessionCtx->Query().PreparingQuery->MutablePhysicalQuery()); + phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME); + phyTx.MutableSchemeOperation()->SetObjectType(settings.GetTypeId()); + + NMetadata::NModifications::IOperationsManager::TYqlConclusionStatus prepareStatus = + (cBehaviour->GetOperationsManager().get()->*prepareMethod)( + *phyTx.MutableSchemeOperation(), settings, cBehaviour, context); + + TGenericResult result; + if (prepareStatus.Ok()) { + result.SetSuccess(); + phyTxRemover.Forget(); + } else { + result.AddIssue(NYql::TIssue(prepareStatus.GetErrorMessage())); + result.SetStatus(prepareStatus.GetStatus()); + } + return result; + } catch (const std::exception& e) { + return ResultFromException(e); + } + } + TFuture UpsertObject(const TString& cluster, const TUpsertObjectSettings& settings) override { - FORWARD_ENSURE_NO_PREPARE(UpsertObject, cluster, settings); + CHECK_PREPARED_DDL(UpsertObject); + + if (IsPrepare()) { + return MakeFuture(PrepareObjectOperation(cluster, settings, &NMetadata::NModifications::IOperationsManager::PrepareUpsertObjectSchemeOperation)); + } else { + return Gateway->UpsertObject(cluster, settings); + } } TFuture CreateObject(const TString& cluster, const TCreateObjectSettings& settings) override { - FORWARD_ENSURE_NO_PREPARE(CreateObject, cluster, settings); + CHECK_PREPARED_DDL(CreateObject); + + if (IsPrepare()) { + return MakeFuture(PrepareObjectOperation(cluster, settings, &NMetadata::NModifications::IOperationsManager::PrepareCreateObjectSchemeOperation)); + } else { + return Gateway->CreateObject(cluster, settings); + } } TFuture AlterObject(const TString& cluster, const TAlterObjectSettings& settings) override { - FORWARD_ENSURE_NO_PREPARE(AlterObject, cluster, settings); + CHECK_PREPARED_DDL(AlterObject); + + if (IsPrepare()) { + return MakeFuture(PrepareObjectOperation(cluster, settings, &NMetadata::NModifications::IOperationsManager::PrepareAlterObjectSchemeOperation)); + } else { + return Gateway->AlterObject(cluster, settings); + } } TFuture DropObject(const TString& cluster, const TDropObjectSettings& settings) override { - FORWARD_ENSURE_NO_PREPARE(DropObject, cluster, settings); + CHECK_PREPARED_DDL(DropObject); + + if (IsPrepare()) { + return MakeFuture(PrepareObjectOperation(cluster, settings, &NMetadata::NModifications::IOperationsManager::PrepareDropObjectSchemeOperation)); + } else { + return Gateway->DropObject(cluster, settings); + } } TFuture CreateGroup(const TString& cluster, const TCreateGroupSettings& settings) override { @@ -913,7 +1016,7 @@ class TKqpGatewayProxy : public IKikimrGateway { auto createGroupPromise = NewPromise(); TString database; - if (!SetDatabaseForLoginOperation(database, GetDomainLoginOnly(), GetDomainName(), GetDatabase())) { + if (!GetDatabaseForLoginOperation(database)) { return MakeFuture(ResultFromError("Couldn't get domain name")); } @@ -950,7 +1053,7 @@ class TKqpGatewayProxy : public IKikimrGateway { auto alterGroupPromise = NewPromise(); TString database; - if (!SetDatabaseForLoginOperation(database, GetDomainLoginOnly(), GetDomainName(), GetDatabase())) { + if (!GetDatabaseForLoginOperation(database)) { return MakeFuture(ResultFromError("Couldn't get domain name")); } @@ -977,7 +1080,7 @@ class TKqpGatewayProxy : public IKikimrGateway { auto renameGroupPromise = NewPromise(); TString database; - if (!SetDatabaseForLoginOperation(database, GetDomainLoginOnly(), GetDomainName(), GetDatabase())) { + if (!GetDatabaseForLoginOperation(database)) { return MakeFuture(ResultFromError("Couldn't get domain name")); } @@ -1010,7 +1113,7 @@ class TKqpGatewayProxy : public IKikimrGateway { auto dropGroupPromise = NewPromise(); TString database; - if (!SetDatabaseForLoginOperation(database, GetDomainLoginOnly(), GetDomainName(), GetDatabase())) { + if (!GetDatabaseForLoginOperation(database)) { return MakeFuture(ResultFromError("Couldn't get domain name")); } @@ -1068,7 +1171,38 @@ class TKqpGatewayProxy : public IKikimrGateway { TFuture CreateExternalTable(const TString& cluster, const TCreateExternalTableSettings& settings, bool createDir) override { - FORWARD_ENSURE_NO_PREPARE(CreateExternalTable, cluster, settings, createDir); + CHECK_PREPARED_DDL(CreateExternalTable); + + if (IsPrepare()) { + if (cluster != SessionCtx->GetCluster()) { + return MakeFuture(ResultFromError("Invalid cluster: " + cluster)); + } + + std::pair pathPair; + { + TString error; + if (!NSchemeHelpers::SplitTablePath(settings.ExternalTable, GetDatabase(), pathPair, error, createDir)) { + return MakeFuture(ResultFromError(error)); + } + } + + TRemoveLastPhyTxHelper phyTxRemover; + auto& phyTx = phyTxRemover.Capture(SessionCtx->Query().PreparingQuery->MutablePhysicalQuery()); + phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME); + + auto& schemeTx = *phyTx.MutableSchemeOperation()->MutableCreateExternalTable(); + schemeTx.SetWorkingDir(pathPair.first); + schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpCreateExternalTable); + + NKikimrSchemeOp::TExternalTableDescription& externalTableDesc = *schemeTx.MutableCreateExternalTable(); + NSchemeHelpers::FillCreateExternalTableColumnDesc(externalTableDesc, pathPair.second, settings); + TGenericResult result; + result.SetSuccess(); + phyTxRemover.Forget(); + return MakeFuture(result); + } else { + return Gateway->CreateExternalTable(cluster, settings, createDir); + } } TFuture AlterExternalTable(const TString& cluster, @@ -1080,7 +1214,39 @@ class TKqpGatewayProxy : public IKikimrGateway { TFuture DropExternalTable(const TString& cluster, const TDropExternalTableSettings& settings) override { - FORWARD_ENSURE_NO_PREPARE(DropExternalTable, cluster, settings); + CHECK_PREPARED_DDL(DropExternalTable); + + if (IsPrepare()) { + if (cluster != SessionCtx->GetCluster()) { + return MakeFuture(ResultFromError("Invalid cluster: " + cluster)); + } + + std::pair pathPair; + { + TString error; + if (!NSchemeHelpers::SplitTablePath(settings.ExternalTable, GetDatabase(), pathPair, error, false)) { + return MakeFuture(ResultFromError(error)); + } + } + + TRemoveLastPhyTxHelper phyTxRemover; + auto& phyTx = phyTxRemover.Capture(SessionCtx->Query().PreparingQuery->MutablePhysicalQuery()); + phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME); + + auto& schemeTx = *phyTx.MutableSchemeOperation()->MutableDropExternalTable(); + schemeTx.SetWorkingDir(pathPair.first); + schemeTx.SetOperationType(NKikimrSchemeOp::ESchemeOpDropExternalTable); + + NKikimrSchemeOp::TDrop& drop = *schemeTx.MutableDrop(); + drop.SetName(pathPair.second); + + TGenericResult result; + result.SetSuccess(); + phyTxRemover.Forget(); + return MakeFuture(result); + } else { + return Gateway->DropExternalTable(cluster, settings); + } } TVector GetCollectedSchemeData() override { @@ -1150,6 +1316,7 @@ class TKqpGatewayProxy : public IKikimrGateway { private: TIntrusivePtr Gateway; TIntrusivePtr SessionCtx; + TActorSystem* ActorSystem = nullptr; }; #undef FORWARD_ENSURE_NO_PREPARE @@ -1158,9 +1325,9 @@ class TKqpGatewayProxy : public IKikimrGateway { } // namespace TIntrusivePtr CreateKqpGatewayProxy(const TIntrusivePtr& gateway, - const TIntrusivePtr& sessionCtx) + const TIntrusivePtr& sessionCtx, TActorSystem* actorSystem) { - return MakeIntrusive(gateway, sessionCtx); + return MakeIntrusive(gateway, sessionCtx, actorSystem); } } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/host/kqp_host.cpp b/ydb/core/kqp/host/kqp_host.cpp index e25229efdd72..7560fb9be217 100644 --- a/ydb/core/kqp/host/kqp_host.cpp +++ b/ydb/core/kqp/host/kqp_host.cpp @@ -819,8 +819,8 @@ class TKqpQueryExecutor : public IKikimrQueryExecutor { auto node = TExprBase(exprNode); hasFederatedSorcesOrSinks = hasFederatedSorcesOrSinks - || node.Maybe() - || node.Maybe() + || node.Maybe() + || node.Maybe() || node.Maybe(); return !hasFederatedSorcesOrSinks; @@ -923,7 +923,8 @@ class TKqpHost : public IKqpHost { TKikimrConfiguration::TPtr config, IModuleResolver::TPtr moduleResolver, std::optional federatedQuerySetup, const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry, bool keepConfigChanges, - bool isInternalCall, TKqpTempTablesState::TConstPtr tempTablesState = nullptr) + bool isInternalCall, TKqpTempTablesState::TConstPtr tempTablesState = nullptr, + NActors::TActorSystem* actorSystem = nullptr) : Gateway(gateway) , Cluster(cluster) , ExprCtx(new TExprContext()) @@ -936,6 +937,7 @@ class TKqpHost : public IKqpHost { , PlanBuilder(CreatePlanBuilder(*TypesCtx)) , FakeWorld(ExprCtx->NewWorld(TPosition())) , ExecuteCtx(MakeIntrusive()) + , ActorSystem(actorSystem ? actorSystem : NActors::TActivationContext::ActorSystem()) { if (funcRegistry) { FuncRegistry = funcRegistry; @@ -945,6 +947,7 @@ class TKqpHost : public IKqpHost { } SessionCtx->SetDatabase(database); + SessionCtx->SetCluster(cluster); SessionCtx->SetTempTables(std::move(tempTablesState)); } @@ -1510,7 +1513,7 @@ class TKqpHost : public IKqpHost { }; // Kikimr provider - auto gatewayProxy = CreateKqpGatewayProxy(Gateway, SessionCtx); + auto gatewayProxy = CreateKqpGatewayProxy(Gateway, SessionCtx, ActorSystem); auto queryExecutor = MakeIntrusive(Gateway, Cluster, SessionCtx, KqpRunner); auto kikimrDataSource = CreateKikimrDataSource(*FuncRegistry, *TypesCtx, gatewayProxy, SessionCtx, @@ -1648,6 +1651,7 @@ class TKqpHost : public IKqpHost { NExternalSource::IExternalSourceFactory::TPtr ExternalSourceFactory{NExternalSource::CreateExternalSourceFactory({})}; TKqpTempTablesState::TConstPtr TempTablesState; + NActors::TActorSystem* ActorSystem = nullptr; }; } // namespace @@ -1669,10 +1673,10 @@ TIntrusivePtr CreateKqpHost(TIntrusivePtr gateway, const TString& cluster, const TString& database, TKikimrConfiguration::TPtr config, IModuleResolver::TPtr moduleResolver, std::optional federatedQuerySetup, const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry, bool keepConfigChanges, bool isInternalCall, - TKqpTempTablesState::TConstPtr tempTablesState) + TKqpTempTablesState::TConstPtr tempTablesState, NActors::TActorSystem* actorSystem) { return MakeIntrusive(gateway, cluster, database, config, moduleResolver, federatedQuerySetup, funcRegistry, - keepConfigChanges, isInternalCall, std::move(tempTablesState)); + keepConfigChanges, isInternalCall, std::move(tempTablesState), actorSystem); } } // namespace NKqp diff --git a/ydb/core/kqp/host/kqp_host.h b/ydb/core/kqp/host/kqp_host.h index 5423b17bb53f..445d77e8875e 100644 --- a/ydb/core/kqp/host/kqp_host.h +++ b/ydb/core/kqp/host/kqp_host.h @@ -7,6 +7,10 @@ #include #include +namespace NActors { +class TActorSystem; +} // namespace NActors + namespace NKikimr { namespace NKqp { @@ -105,7 +109,8 @@ class IKqpHost : public TThrRefBase { TIntrusivePtr CreateKqpHost(TIntrusivePtr gateway, const TString& cluster, const TString& database, NYql::TKikimrConfiguration::TPtr config, NYql::IModuleResolver::TPtr moduleResolver, std::optional federatedQuerySetup, const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry = nullptr, - bool keepConfigChanges = false, bool isInternalCall = false, TKqpTempTablesState::TConstPtr tempTablesState = nullptr); + bool keepConfigChanges = false, bool isInternalCall = false, TKqpTempTablesState::TConstPtr tempTablesState = nullptr, + NActors::TActorSystem* actorSystem = nullptr /*take from TLS by default*/); } // namespace NKqp } // namespace NKikimr diff --git a/ydb/core/kqp/host/kqp_host_impl.h b/ydb/core/kqp/host/kqp_host_impl.h index 4a345ab2aaa0..550f9e2776d3 100644 --- a/ydb/core/kqp/host/kqp_host_impl.h +++ b/ydb/core/kqp/host/kqp_host_impl.h @@ -257,7 +257,7 @@ TAutoPtr CreateKqpTypeAnnotationTransformer(const TStri TAutoPtr CreateKqpCheckQueryTransformer(); TIntrusivePtr CreateKqpGatewayProxy(const TIntrusivePtr& gateway, - const TIntrusivePtr& sessionCtx); + const TIntrusivePtr& sessionCtx, TActorSystem* actorSystem); } // namespace NKqp } // namespace NKikimr diff --git a/ydb/core/kqp/host/ya.make b/ydb/core/kqp/host/ya.make index cb9a5cfc95b4..7c6cea9bb0ca 100644 --- a/ydb/core/kqp/host/ya.make +++ b/ydb/core/kqp/host/ya.make @@ -14,6 +14,7 @@ PEERDIR( ydb/core/base ydb/core/kqp/common ydb/core/kqp/federated_query + ydb/core/kqp/gateway/utils ydb/core/kqp/opt ydb/core/kqp/provider ydb/core/tx/long_tx_service/public diff --git a/ydb/core/kqp/provider/yql_kikimr_exec.cpp b/ydb/core/kqp/provider/yql_kikimr_exec.cpp index 8b8224529760..99d4a78d7c56 100644 --- a/ydb/core/kqp/provider/yql_kikimr_exec.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_exec.cpp @@ -807,14 +807,9 @@ class TObjectModifierTransformer { , ActionInfo(actionInfo) , SessionCtx(sessionCtx) { - } - std::pair Execute(const TKiObject& kiObject, const TExprNode::TPtr& input, TExprContext& ctx) { - if (!EnsureNotPrepare(ActionInfo + " " + kiObject.TypeId(), input->Pos(), SessionCtx->Query(), ctx)) { - return SyncError(); - } - + std::pair Execute(const TKiObject& kiObject, const TExprNode::TPtr& input, TExprContext& /*ctx*/) { auto requireStatus = RequireChild(*input, 0); if (requireStatus.Level != IGraphTransformer::TStatus::Ok) { return SyncStatus(requireStatus); @@ -825,9 +820,8 @@ class TObjectModifierTransformer { if (!settings.DeserializeFromKi(kiObject)) { return SyncError(); } - bool prepareOnly = SessionCtx->Query().PrepareOnly; - auto future = prepareOnly ? CreateDummySuccess() : DoExecute(cluster, settings); + auto future = DoExecute(cluster, settings); return WrapFuture(future, [](const IKikimrGateway::TGenericResult& res, const TExprNode::TPtr& input, TExprContext& ctx) { Y_UNUSED(res); @@ -1109,7 +1103,7 @@ class TKiSinkCallableExecutionTransformer : public TAsyncCallbackTransformer(); for(const auto& constraint: columnConstraints.Value().Cast()) { if (constraint.Name().Value() == "serial") { - ctx.AddError(TIssue(ctx.GetPosition(constraint.Pos()), + ctx.AddError(TIssue(ctx.GetPosition(constraint.Pos()), "Column addition with serial data type is unsupported")); return SyncError(); } else if (constraint.Name().Value() == "default") { diff --git a/ydb/core/kqp/provider/yql_kikimr_gateway.cpp b/ydb/core/kqp/provider/yql_kikimr_gateway.cpp index c21fba083dd9..77b851606b7f 100644 --- a/ydb/core/kqp/provider/yql_kikimr_gateway.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_gateway.cpp @@ -6,7 +6,9 @@ #include #include #include +#include #include +#include #include #include @@ -39,7 +41,7 @@ static void CreateDirs(std::shared_ptr> partsHolder, size_t ind CreateDirs(partsHolder, index + 1, promise, createDir); }); - TString basePath = IKikimrGateway::CombinePath(parts.begin(), parts.begin() + index); + TString basePath = NKikimr::NKqp::NSchemeHelpers::CombinePath(parts.begin(), parts.begin() + index); createDir(basePath, parts[index], partPromise); } @@ -52,42 +54,9 @@ TKikimrPathId TKikimrPathId::Parse(const TStringBuf& str) { return TKikimrPathId(FromString(ownerStr), FromString(idStr)); } -TString IKikimrGateway::CanonizePath(const TString& path) { - if (path.empty()) { - return "/"; - } - - if (path[0] != '/') { - return "/" + path; - } - - return path; -} - -TVector IKikimrGateway::SplitPath(const TString& path) { - TVector parts; - Split(path, "/", parts); - return parts; -} - -bool IKikimrGateway::TrySplitTablePath(const TString& path, std::pair& result, TString& error) { - auto parts = SplitPath(path); - - if (parts.size() < 2) { - error = TString("Missing scheme root in table path: ") + path; - return false; - } - - result = std::make_pair( - CombinePath(parts.begin(), parts.end() - 1), - parts.back()); - - return true; -} - TFuture IKikimrGateway::CreatePath(const TString& path, TCreateDirFunc createDir) { - auto partsHolder = std::make_shared>(SplitPath(path)); - auto &parts = *partsHolder; + auto partsHolder = std::make_shared>(NKikimr::SplitPath(path)); + auto& parts = *partsHolder; if (parts.size() < 2) { TGenericResult result; @@ -101,11 +70,6 @@ TFuture IKikimrGateway::CreatePath(const TString return pathPromise.GetFuture(); } -TString IKikimrGateway::CreateIndexTablePath(const TString& tableName, const TString& indexName) { - return tableName + "/" + indexName + "/indexImplTable"; -} - - void IKikimrGateway::BuildIndexMetadata(TTableMetadataResult& loadTableMetadataResult) { auto tableMetadata = loadTableMetadataResult.Metadata; YQL_ENSURE(tableMetadata); @@ -128,7 +92,7 @@ void IKikimrGateway::BuildIndexMetadata(TTableMetadataResult& loadTableMetadataR tableMetadata->SecondaryGlobalIndexMetadata.resize(indexesCount); for (size_t i = 0; i < indexesCount; i++) { const auto& index = tableMetadata->Indexes[i]; - auto indexTablePath = CreateIndexTablePath(tableName, index.Name); + auto indexTablePath = NKikimr::NKqp::NSchemeHelpers::CreateIndexTablePath(tableName, index.Name); NKikimr::NTableIndex::TTableColumns indexTableColumns = NKikimr::NTableIndex::CalcTableImplDescription( tableColumns, NKikimr::NTableIndex::TIndexColumns{index.KeyColumns, {}}); diff --git a/ydb/core/kqp/provider/yql_kikimr_gateway.h b/ydb/core/kqp/provider/yql_kikimr_gateway.h index 63c67494eff4..4f5fd8b86606 100644 --- a/ydb/core/kqp/provider/yql_kikimr_gateway.h +++ b/ydb/core/kqp/provider/yql_kikimr_gateway.h @@ -850,24 +850,8 @@ class IKikimrGateway : public TThrRefBase { public: using TCreateDirFunc = std::function)>; - static TString CanonizePath(const TString& path); - - template - static TString CombinePath(TIter begin, TIter end, bool canonize = true) { - auto path = JoinRange("/", begin, end); - return canonize - ? CanonizePath(path) - : path; - } - - static TVector SplitPath(const TString& path); - - static bool TrySplitTablePath(const TString& path, std::pair& result, TString& error); - static NThreading::TFuture CreatePath(const TString& path, TCreateDirFunc createDir); - static TString CreateIndexTablePath(const TString& tableName, const TString& indexName); - static void BuildIndexMetadata(TTableMetadataResult& loadTableMetadataResult); }; diff --git a/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp b/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp index 3809402d7bc2..339c284694a0 100644 --- a/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp @@ -1,6 +1,7 @@ #include "yql_kikimr_provider_impl.h" #include "yql_kikimr_gateway.h" +#include #include #include #include @@ -126,7 +127,7 @@ struct TKiExploreTxResults { auto view = key.GetView(); if (view && view->Name) { const auto& indexName = view->Name; - const auto indexTablePath = IKikimrGateway::CreateIndexTablePath(tableMeta->Name, indexName); + const auto indexTablePath = NKikimr::NKqp::NSchemeHelpers::CreateIndexTablePath(tableMeta->Name, indexName); auto indexIt = std::find_if(tableMeta->Indexes.begin(), tableMeta->Indexes.end(), [&indexName](const auto& index){ return index.Name == indexName; @@ -176,7 +177,7 @@ struct TKiExploreTxResults { continue; } - const auto indexTable = IKikimrGateway::CreateIndexTablePath(tableMeta->Name, index.Name); + const auto indexTable = NKikimr::NKqp::NSchemeHelpers::CreateIndexTablePath(tableMeta->Name, index.Name); ops[tableMeta->Name] |= TPrimitiveYdbOperation::Read; ops[indexTable] = TPrimitiveYdbOperation::Write; @@ -198,7 +199,7 @@ struct TKiExploreTxResults { continue; } - const auto indexTable = IKikimrGateway::CreateIndexTablePath(tableMeta->Name, index.Name); + const auto indexTable = NKikimr::NKqp::NSchemeHelpers::CreateIndexTablePath(tableMeta->Name, index.Name); for (const auto& column : index.KeyColumns) { if (updateColumns.contains(column)) { // delete old index values and upsert rows into index table diff --git a/ydb/core/kqp/provider/yql_kikimr_provider.h b/ydb/core/kqp/provider/yql_kikimr_provider.h index 549a34235785..f662ce6becfe 100644 --- a/ydb/core/kqp/provider/yql_kikimr_provider.h +++ b/ydb/core/kqp/provider/yql_kikimr_provider.h @@ -480,10 +480,18 @@ class TKikimrSessionContext : public TThrRefBase { UserName = userName; } + TString GetCluster() const { + return Cluster; + } + TString GetDatabase() const { return Database; } + void SetCluster(const TString& cluster) { + Cluster = cluster; + } + void SetDatabase(const TString& database) { Database = database; } @@ -512,6 +520,7 @@ class TKikimrSessionContext : public TThrRefBase { private: TString UserName; + TString Cluster; TString Database; TKikimrConfiguration::TPtr Configuration; TIntrusivePtr TablesData; diff --git a/ydb/core/kqp/ut/federated_query/common/common.cpp b/ydb/core/kqp/ut/federated_query/common/common.cpp index f0e791cebd81..2d49100c163a 100644 --- a/ydb/core/kqp/ut/federated_query/common/common.cpp +++ b/ydb/core/kqp/ut/federated_query/common/common.cpp @@ -25,21 +25,25 @@ namespace NKikimr::NKqp::NFederatedQueryTest { NKikimrConfig::TFeatureFlags featureFlags; featureFlags.SetEnableExternalDataSources(true); featureFlags.SetEnableScriptExecutionOperations(true); + if (!appConfig) { + appConfig.emplace(); + } + appConfig->MutableTableServiceConfig()->SetEnablePreparedDdl(true); auto federatedQuerySetupFactory = std::make_shared( httpGateway, connectorClient, nullptr, databaseAsyncResolver, - appConfig ? appConfig->GetQueryServiceConfig().GetS3() : NYql::TS3GatewayConfig(), - appConfig ? appConfig->GetQueryServiceConfig().GetGeneric() : NYql::TGenericGatewayConfig()); + appConfig->GetQueryServiceConfig().GetS3(), + appConfig->GetQueryServiceConfig().GetGeneric()); auto settings = TKikimrSettings() .SetFeatureFlags(featureFlags) .SetFederatedQuerySetupFactory(federatedQuerySetupFactory) .SetKqpSettings({}); - settings = appConfig ? settings.SetAppConfig(appConfig.value()) : settings.SetAppConfig({}); + settings = settings.SetAppConfig(appConfig.value()); return std::make_shared(settings); } diff --git a/ydb/core/kqp/ut/federated_query/s3/kqp_federated_scheme_ut.cpp b/ydb/core/kqp/ut/federated_query/s3/kqp_federated_scheme_ut.cpp new file mode 100644 index 000000000000..2b9ebf0dbd41 --- /dev/null +++ b/ydb/core/kqp/ut/federated_query/s3/kqp_federated_scheme_ut.cpp @@ -0,0 +1,150 @@ +#include "s3_recipe_ut_helpers.h" + +#include +#include +#include +#include +#include + +#include + +namespace NKikimr::NKqp { + +using namespace NYdb; +using namespace NYdb::NQuery; +using namespace NKikimr::NKqp::NFederatedQueryTest; +using namespace NTestUtils; +using namespace fmt::literals; + +Y_UNIT_TEST_SUITE(KqpFederatedSchemeTest) { + Y_UNIT_TEST(CreateExternalTable) { + enum EEx { + Empty, + IfExists, + IfNotExists, + }; + + CreateBucketWithObject("CreateExternalDataSourceBucket", "obj", TEST_CONTENT); + + auto kikimr = MakeKikimrRunner(NYql::IHTTPGateway::Make()); + + auto queryClient = kikimr->GetQueryClient(); + + auto logSql = [](const TString& sql, bool expectSuccess) { + Cerr << "Execute sql in test (expect " << (expectSuccess ? "success" : "fail") << "):\n" + << sql << Endl; + }; + + auto checkCreate = [&](bool expectSuccess, EEx exMode, int nameSuffix) { + UNIT_ASSERT_UNEQUAL(exMode, EEx::IfExists); + const TString ifNotExistsStatement = exMode == EEx::IfNotExists ? "IF NOT EXISTS" : ""; + const TString sql = fmt::format(R"sql( + CREATE EXTERNAL DATA SOURCE {if_not_exists} test_data_source_{name_suffix} WITH ( + SOURCE_TYPE="ObjectStorage", + LOCATION="{location}", + AUTH_METHOD="NONE" + ); + + CREATE EXTERNAL TABLE {if_not_exists} test_table_{name_suffix} ( + key Utf8 NOT NULL, + value Utf8 NOT NULL + ) WITH ( + DATA_SOURCE="test_data_source_{name_suffix}", + LOCATION="obj", + FORMAT="json_each_row" + ); + )sql", + "location"_a = GetBucketLocation("CreateExternalDataSourceBucket"), + "name_suffix"_a = nameSuffix, + "if_not_exists"_a = ifNotExistsStatement + ); + logSql(sql, expectSuccess); + auto result = queryClient.ExecuteQuery( + sql, + TTxControl::NoTx()).GetValueSync(); + + if (expectSuccess) { + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + } else { + UNIT_ASSERT(!result.IsSuccess()); + } + }; + + auto checkTableExists = [&](bool expectSuccess, int nameSuffix) { + // Check that we can use created external table + const TString sql = fmt::format(R"sql( + SELECT * FROM test_table_{name_suffix}; + )sql", + "name_suffix"_a = nameSuffix + ); + logSql(sql, expectSuccess); + auto result = queryClient.ExecuteQuery( + sql, + TTxControl::BeginTx().CommitTx()).GetValueSync(); + + if (expectSuccess) { + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + auto resultSet = result.GetResultSetParser(0); + UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), 2); + } else { + UNIT_ASSERT(!result.IsSuccess()); + } + }; + + auto checkDrop = [&](bool expectSuccess, EEx exMode, int nameSuffix) { + UNIT_ASSERT_UNEQUAL(exMode, EEx::IfNotExists); + const TString ifExistsStatement = exMode == EEx::IfExists ? "IF EXISTS" : ""; + const TString sql = fmt::format(R"sql( + DROP EXTERNAL TABLE {if_exists} test_table_{name_suffix}; + DROP EXTERNAL DATA SOURCE {if_exists} test_data_source_{name_suffix}; + )sql", + "name_suffix"_a = nameSuffix, + "if_exists"_a = ifExistsStatement, + "name_suffix"_a = nameSuffix + ); + logSql(sql, expectSuccess); + auto result = queryClient.ExecuteQuery( + sql, + TTxControl::NoTx()).GetValueSync(); + + if (expectSuccess) { + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + } else { + UNIT_ASSERT(!result.IsSuccess()); + } + }; + + // usual create + checkCreate(true, EEx::Empty, 0); + checkTableExists(true, 0); + + // create already existing table + checkCreate(false, EEx::Empty, 0); // already + //checkCreate(true, EEx::IfNotExists, 0); + checkTableExists(true, 0); + + // usual drop + checkDrop(true, EEx::Empty, 0); + checkTableExists(false, 0); + checkDrop(false, EEx::Empty, 0); // no such table + + // drop if exists + //checkDrop(true, EEx::IfExists, 0); + checkTableExists(false, 0); + + // failed attempt to drop nonexisting table + checkDrop(false, EEx::Empty, 0); + + // create with if not exists + //checkCreate(true, EEx::IfNotExists, 1); // real creation + //checkTableExists(true, 1); + //checkCreate(true, EEx::IfNotExists, 1); + + // drop if exists + //checkDrop(true, EEx::IfExists, 1); // real drop + //checkTableExists(false, 1); + //checkDrop(true, EEx::IfExists, 1); + } +} + +} // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/ut/federated_query/s3/ya.make b/ydb/core/kqp/ut/federated_query/s3/ya.make index a624248e2029..c4102a42fa2d 100644 --- a/ydb/core/kqp/ut/federated_query/s3/ya.make +++ b/ydb/core/kqp/ut/federated_query/s3/ya.make @@ -11,6 +11,7 @@ ENDIF() SRCS( kqp_federated_query_ut.cpp + kqp_federated_scheme_ut.cpp kqp_s3_plan_ut.cpp s3_recipe_ut_helpers.cpp ) diff --git a/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp b/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp index 4b9ad3cf363a..4607d23eeab7 100644 --- a/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp +++ b/ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp @@ -41,6 +41,7 @@ TIntrusivePtr GetIcGateway(Tests::TServer& server) { TIntrusivePtr CreateKikimrQueryProcessor(TIntrusivePtr gateway, const TString& cluster, NYql::IModuleResolver::TPtr moduleResolver, + NActors::TActorSystem* actorSystem, const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry = nullptr, const TVector& settings = {}, bool keepConfigChanges = false) { @@ -54,7 +55,7 @@ TIntrusivePtr CreateKikimrQueryProcessor(TIntrusivePtr ga auto federatedQuerySetup = std::make_optional({NYql::IHTTPGateway::Make(), nullptr, nullptr, nullptr, {}, {}}); return NKqp::CreateKqpHost(gateway, cluster, "/Root", kikimrConfig, moduleResolver, - federatedQuerySetup, funcRegistry, funcRegistry, keepConfigChanges); + federatedQuerySetup, funcRegistry, funcRegistry, keepConfigChanges, nullptr, actorSystem); } NYql::NNodes::TExprBase GetExpr(const TString& ast, NYql::TExprContext& ctx, NYql::IModuleResolver* moduleResolver) { @@ -95,7 +96,7 @@ void CreateTableWithIndexWithState( metadata->KeyColumnNames.push_back("key"); { - auto gatewayProxy = CreateKqpGatewayProxy(gateway, nullptr); + auto gatewayProxy = CreateKqpGatewayProxy(gateway, nullptr, server.GetRuntime()->GetAnyNodeActorSystem()); auto result = gatewayProxy->CreateTable(metadata, true).ExtractValueSync(); UNIT_ASSERT(result.Success()); } @@ -125,7 +126,7 @@ Y_UNIT_TEST_SUITE(KqpIndexMetadata) { NYql::IModuleResolver::TPtr moduleResolver; YQL_ENSURE(GetYqlDefaultModuleResolver(moduleCtx, moduleResolver)); auto qp = CreateKikimrQueryProcessor(gateway, TestCluster, moduleResolver, - server.GetFunctionRegistry()); + server.GetRuntime()->GetAnyNodeActorSystem(), server.GetFunctionRegistry()); { const TString query = Q_(R"( @@ -281,7 +282,7 @@ Y_UNIT_TEST_SUITE(KqpIndexMetadata) { NYql::IModuleResolver::TPtr moduleResolver; YQL_ENSURE(GetYqlDefaultModuleResolver(moduleCtx, moduleResolver)); auto qp = CreateKikimrQueryProcessor(gateway, TestCluster, moduleResolver, - server.GetFunctionRegistry()); + server.GetRuntime()->GetAnyNodeActorSystem(), server.GetFunctionRegistry()); { const TString query = Q_(R"( @@ -1869,7 +1870,7 @@ Y_UNIT_TEST_SUITE(KqpIndexes) { { auto settings = TExplainDataQuerySettings(); settings.WithCollectFullDiagnostics(true); - + auto result = session.ExplainDataQuery( query, settings) .ExtractValueSync(); @@ -1902,7 +1903,7 @@ Y_UNIT_TEST_SUITE(KqpIndexes) { { auto settings = TExplainDataQuerySettings(); - + auto result = session.ExplainDataQuery( query, settings) .ExtractValueSync(); diff --git a/ydb/core/protos/kqp_physical.proto b/ydb/core/protos/kqp_physical.proto index 42b8708a3f3f..b55adcc9e2de 100644 --- a/ydb/core/protos/kqp_physical.proto +++ b/ydb/core/protos/kqp_physical.proto @@ -381,6 +381,11 @@ message TKqpSchemeOperation { FLAG_IF_NOT_EXISTS = 2; // set if IF_NOT_EXISTS modificator present }; + // If the object is modified, this field is not empty + // and in that case modification will be through + // NMetadata::IClassBehaviour way + string ObjectType = 100; + oneof Operation { NKikimrSchemeOp.TModifyScheme CreateTable = 1; NKikimrSchemeOp.TModifyScheme DropTable = 2; @@ -394,6 +399,12 @@ message TKqpSchemeOperation { NKikimrSchemeOp.TModifyScheme RemoveGroupMembership = 10; NKikimrSchemeOp.TModifyScheme DropGroup = 11; NKikimrSchemeOp.TModifyScheme RenameGroup = 12; + NKikimrSchemeOp.TModifyScheme CreateExternalDataSource = 13; + NKikimrSchemeOp.TModifyScheme AlterExternalDataSource = 14; + NKikimrSchemeOp.TModifyScheme DropExternalDataSource = 15; + NKikimrSchemeOp.TModifyScheme CreateExternalTable = 16; + NKikimrSchemeOp.TModifyScheme AlterExternalTable = 17; + NKikimrSchemeOp.TModifyScheme DropExternalTable = 18; } } diff --git a/ydb/core/tx/tiering/rule/manager.cpp b/ydb/core/tx/tiering/rule/manager.cpp index bdcfac875c64..0b3c24d9e74c 100644 --- a/ydb/core/tx/tiering/rule/manager.cpp +++ b/ydb/core/tx/tiering/rule/manager.cpp @@ -30,4 +30,16 @@ NMetadata::NModifications::TOperationParsingResult TTieringRulesManager::DoBuild return result; } +NMetadata::NModifications::IOperationsManager::TYqlConclusionStatus TTieringRulesManager::DoPrepare(NKqpProto::TKqpSchemeOperation& /*schemeOperation*/, const NYql::TObjectSettingsImpl& /*settings*/, + const NMetadata::IClassBehaviour::TPtr& /*manager*/, NMetadata::NModifications::IOperationsManager::TInternalModificationContext& /*context*/) const { + return NMetadata::NModifications::IOperationsManager::TYqlConclusionStatus::Fail( + "Prepare operations for TIERING_RULE objects are not supported"); +} + +NThreading::TFuture TTieringRulesManager::ExecutePrepared(const NKqpProto::TKqpSchemeOperation& /*schemeOperation*/, + const NMetadata::IClassBehaviour::TPtr& /*manager*/, const IOperationsManager::TExternalModificationContext& /*context*/) const { + return NThreading::MakeFuture(NMetadata::NModifications::IOperationsManager::TYqlConclusionStatus::Fail( + "Execution of prepare operations for TIERING_RULE objects is not supported")); +} + } diff --git a/ydb/core/tx/tiering/rule/manager.h b/ydb/core/tx/tiering/rule/manager.h index 3268c90021c4..45e7f767ef37 100644 --- a/ydb/core/tx/tiering/rule/manager.h +++ b/ydb/core/tx/tiering/rule/manager.h @@ -13,6 +13,12 @@ class TTieringRulesManager: public NMetadata::NModifications::TGenericOperations virtual NMetadata::NModifications::TOperationParsingResult DoBuildPatchFromSettings(const NYql::TObjectSettingsImpl& settings, TInternalModificationContext& context) const override; + + virtual IOperationsManager::TYqlConclusionStatus DoPrepare(NKqpProto::TKqpSchemeOperation& schemeOperation, const NYql::TObjectSettingsImpl& settings, + const NMetadata::IClassBehaviour::TPtr& manager, IOperationsManager::TInternalModificationContext& context) const override; + + virtual NThreading::TFuture ExecutePrepared(const NKqpProto::TKqpSchemeOperation& schemeOperation, + const NMetadata::IClassBehaviour::TPtr& manager, const IOperationsManager::TExternalModificationContext& context) const override; }; } diff --git a/ydb/core/tx/tiering/tier/manager.cpp b/ydb/core/tx/tiering/tier/manager.cpp index d9e9834f974f..b8ae6f8b9909 100644 --- a/ydb/core/tx/tiering/tier/manager.cpp +++ b/ydb/core/tx/tiering/tier/manager.cpp @@ -67,4 +67,16 @@ void TTiersManager::DoPrepareObjectsBeforeModification(std::vector& TActivationContext::Register(new TTierPreparationActor(std::move(patchedObjects), controller, context)); } +NMetadata::NModifications::IOperationsManager::TYqlConclusionStatus TTiersManager::DoPrepare(NKqpProto::TKqpSchemeOperation& /*schemeOperation*/, const NYql::TObjectSettingsImpl& /*settings*/, + const NMetadata::IClassBehaviour::TPtr& /*manager*/, NMetadata::NModifications::IOperationsManager::TInternalModificationContext& /*context*/) const { + return NMetadata::NModifications::IOperationsManager::TYqlConclusionStatus::Fail( + "Prepare operations for TIER objects are not supported"); +} + +NThreading::TFuture TTiersManager::ExecutePrepared(const NKqpProto::TKqpSchemeOperation& /*schemeOperation*/, + const NMetadata::IClassBehaviour::TPtr& /*manager*/, const IOperationsManager::TExternalModificationContext& /*context*/) const { + return NThreading::MakeFuture(NMetadata::NModifications::IOperationsManager::TYqlConclusionStatus::Fail( + "Execution of prepare operations for TIER objects is not supported")); +} + } diff --git a/ydb/core/tx/tiering/tier/manager.h b/ydb/core/tx/tiering/tier/manager.h index ba777648139c..3d279986e1f7 100644 --- a/ydb/core/tx/tiering/tier/manager.h +++ b/ydb/core/tx/tiering/tier/manager.h @@ -13,6 +13,12 @@ class TTiersManager: public NMetadata::NModifications::TGenericOperationsManager virtual NMetadata::NModifications::TOperationParsingResult DoBuildPatchFromSettings(const NYql::TObjectSettingsImpl& settings, TInternalModificationContext& context) const override; + + virtual IOperationsManager::TYqlConclusionStatus DoPrepare(NKqpProto::TKqpSchemeOperation& schemeOperation, const NYql::TObjectSettingsImpl& settings, + const NMetadata::IClassBehaviour::TPtr& manager, IOperationsManager::TInternalModificationContext& context) const override; + + virtual NThreading::TFuture ExecutePrepared(const NKqpProto::TKqpSchemeOperation& schemeOperation, + const NMetadata::IClassBehaviour::TPtr& manager, const IOperationsManager::TExternalModificationContext& context) const override; public: }; diff --git a/ydb/services/ext_index/metadata/manager.cpp b/ydb/services/ext_index/metadata/manager.cpp index 9e7b70128782..d1f1fc94743c 100644 --- a/ydb/services/ext_index/metadata/manager.cpp +++ b/ydb/services/ext_index/metadata/manager.cpp @@ -90,4 +90,16 @@ NModifications::TOperationParsingResult TManager::DoBuildPatchFromSettings(const } } +NMetadata::NModifications::IOperationsManager::TYqlConclusionStatus TManager::DoPrepare(NKqpProto::TKqpSchemeOperation& /*schemeOperation*/, const NYql::TObjectSettingsImpl& /*settings*/, + const NMetadata::IClassBehaviour::TPtr& /*manager*/, NMetadata::NModifications::IOperationsManager::TInternalModificationContext& /*context*/) const { + return NMetadata::NModifications::IOperationsManager::TYqlConclusionStatus::Fail( + "Prepare operations for CS_EXT_INDEX objects are not supported"); +} + +NThreading::TFuture TManager::ExecutePrepared(const NKqpProto::TKqpSchemeOperation& /*schemeOperation*/, + const NMetadata::IClassBehaviour::TPtr& /*manager*/, const IOperationsManager::TExternalModificationContext& /*context*/) const { + return NThreading::MakeFuture(NMetadata::NModifications::IOperationsManager::TYqlConclusionStatus::Fail( + "Execution of prepare operations for CS_EXT_INDEX objects is not supported")); +} + } diff --git a/ydb/services/ext_index/metadata/manager.h b/ydb/services/ext_index/metadata/manager.h index 8a97b4e5614b..48c6070819b4 100644 --- a/ydb/services/ext_index/metadata/manager.h +++ b/ydb/services/ext_index/metadata/manager.h @@ -15,6 +15,12 @@ class TManager: public NModifications::TGenericOperationsManager { virtual NModifications::TOperationParsingResult DoBuildPatchFromSettings( const NYql::TObjectSettingsImpl& settings, TInternalModificationContext& context) const override; + virtual IOperationsManager::TYqlConclusionStatus DoPrepare(NKqpProto::TKqpSchemeOperation& schemeOperation, const NYql::TObjectSettingsImpl& settings, + const NMetadata::IClassBehaviour::TPtr& manager, IOperationsManager::TInternalModificationContext& context) const override; + + virtual NThreading::TFuture ExecutePrepared(const NKqpProto::TKqpSchemeOperation& schemeOperation, + const NMetadata::IClassBehaviour::TPtr& manager, const IOperationsManager::TExternalModificationContext& context) const override; + public: }; diff --git a/ydb/services/metadata/initializer/manager.cpp b/ydb/services/metadata/initializer/manager.cpp index 0856618f2ebb..715580877fd0 100644 --- a/ydb/services/metadata/initializer/manager.cpp +++ b/ydb/services/metadata/initializer/manager.cpp @@ -17,4 +17,16 @@ NModifications::TOperationParsingResult TManager::DoBuildPatchFromSettings( return result; } +NMetadata::NModifications::IOperationsManager::TYqlConclusionStatus TManager::DoPrepare(NKqpProto::TKqpSchemeOperation& /*schemeOperation*/, const NYql::TObjectSettingsImpl& /*settings*/, + const NMetadata::IClassBehaviour::TPtr& /*manager*/, NMetadata::NModifications::IOperationsManager::TInternalModificationContext& /*context*/) const { + return NMetadata::NModifications::IOperationsManager::TYqlConclusionStatus::Fail( + "Prepare operations for INITIALIZATION objects are not supported"); +} + +NThreading::TFuture TManager::ExecutePrepared(const NKqpProto::TKqpSchemeOperation& /*schemeOperation*/, + const NMetadata::IClassBehaviour::TPtr& /*manager*/, const IOperationsManager::TExternalModificationContext& /*context*/) const { + return NThreading::MakeFuture(NMetadata::NModifications::IOperationsManager::TYqlConclusionStatus::Fail( + "Execution of prepare operations for INITIALIZATION objects is not supported")); +} + } diff --git a/ydb/services/metadata/initializer/manager.h b/ydb/services/metadata/initializer/manager.h index 036227b7b3cf..394e5490968b 100644 --- a/ydb/services/metadata/initializer/manager.h +++ b/ydb/services/metadata/initializer/manager.h @@ -20,6 +20,12 @@ class TManager: public NModifications::TGenericOperationsManager ExecutePrepared(const NKqpProto::TKqpSchemeOperation& schemeOperation, + const NMetadata::IClassBehaviour::TPtr& manager, const IOperationsManager::TExternalModificationContext& context) const override; + public: }; diff --git a/ydb/services/metadata/manager/abstract.cpp b/ydb/services/metadata/manager/abstract.cpp index 967c0b1f72f3..d22b5e8bcb5c 100644 --- a/ydb/services/metadata/manager/abstract.cpp +++ b/ydb/services/metadata/manager/abstract.cpp @@ -35,7 +35,7 @@ NKikimr::NMetadata::NModifications::TTableSchema& TTableSchema::AddColumn(const } NThreading::TFuture IOperationsManager::DropObject(const NYql::TDropObjectSettings& settings, - const ui32 nodeId, IClassBehaviour::TPtr manager, const TExternalModificationContext& context) const + const ui32 nodeId, const IClassBehaviour::TPtr& manager, const TExternalModificationContext& context) const { if (!NMetadata::NProvider::TServiceOperator::IsEnabled()) { return NThreading::MakeFuture(TYqlConclusionStatus::Fail("metadata provider service is disabled")); @@ -46,7 +46,7 @@ NThreading::TFuture IOperationsManager } NThreading::TFuture IOperationsManager::AlterObject(const NYql::TAlterObjectSettings& settings, - const ui32 nodeId, IClassBehaviour::TPtr manager, const TExternalModificationContext& context) const + const ui32 nodeId, const IClassBehaviour::TPtr& manager, const TExternalModificationContext& context) const { if (!NMetadata::NProvider::TServiceOperator::IsEnabled()) { return NThreading::MakeFuture(TYqlConclusionStatus::Fail("metadata provider service is disabled")); @@ -57,7 +57,7 @@ NThreading::TFuture IOperationsManager } NThreading::TFuture IOperationsManager::CreateObject(const NYql::TCreateObjectSettings& settings, - const ui32 nodeId, IClassBehaviour::TPtr manager, const TExternalModificationContext& context) const { + const ui32 nodeId, const IClassBehaviour::TPtr& manager, const TExternalModificationContext& context) const { if (!NMetadata::NProvider::TServiceOperator::IsEnabled()) { return NThreading::MakeFuture(TYqlConclusionStatus::Fail("metadata provider service is disabled")); } @@ -67,7 +67,7 @@ NThreading::TFuture IOperationsManager } NThreading::TFuture IOperationsManager::UpsertObject(const NYql::TUpsertObjectSettings& settings, - const ui32 nodeId, IClassBehaviour::TPtr manager, const TExternalModificationContext& context) const { + const ui32 nodeId, const IClassBehaviour::TPtr& manager, const TExternalModificationContext& context) const { if (!NMetadata::NProvider::TServiceOperator::IsEnabled()) { return NThreading::MakeFuture(TYqlConclusionStatus::Fail("metadata provider service is disabled")); } @@ -76,4 +76,48 @@ NThreading::TFuture IOperationsManager return DoModify(settings, nodeId, manager, internalContext); } +IOperationsManager::TYqlConclusionStatus IOperationsManager::PrepareUpsertObjectSchemeOperation(NKqpProto::TKqpSchemeOperation& schemeOperation, + const NYql::TUpsertObjectSettings& settings, const IClassBehaviour::TPtr& manager, + const TExternalModificationContext& context) const { + if (!NMetadata::NProvider::TServiceOperator::IsEnabled()) { + return TYqlConclusionStatus::Fail("metadata provider service is disabled"); + } + TInternalModificationContext internalContext(context); + internalContext.SetActivityType(EActivityType::Upsert); + return DoPrepare(schemeOperation, settings, manager, internalContext); +} + +IOperationsManager::TYqlConclusionStatus IOperationsManager::PrepareCreateObjectSchemeOperation(NKqpProto::TKqpSchemeOperation& schemeOperation, + const NYql::TCreateObjectSettings& settings, const IClassBehaviour::TPtr& manager, + const TExternalModificationContext& context) const { + if (!NMetadata::NProvider::TServiceOperator::IsEnabled()) { + return TYqlConclusionStatus::Fail("metadata provider service is disabled"); + } + TInternalModificationContext internalContext(context); + internalContext.SetActivityType(EActivityType::Create); + return DoPrepare(schemeOperation, settings, manager, internalContext); +} + +IOperationsManager::TYqlConclusionStatus IOperationsManager::PrepareAlterObjectSchemeOperation(NKqpProto::TKqpSchemeOperation& schemeOperation, + const NYql::TAlterObjectSettings& settings, const IClassBehaviour::TPtr& manager, + const TExternalModificationContext& context) const { + if (!NMetadata::NProvider::TServiceOperator::IsEnabled()) { + return TYqlConclusionStatus::Fail("metadata provider service is disabled"); + } + TInternalModificationContext internalContext(context); + internalContext.SetActivityType(EActivityType::Alter); + return DoPrepare(schemeOperation, settings, manager, internalContext); +} + +IOperationsManager::TYqlConclusionStatus IOperationsManager::PrepareDropObjectSchemeOperation(NKqpProto::TKqpSchemeOperation& schemeOperation, + const NYql::TDropObjectSettings& settings, const IClassBehaviour::TPtr& manager, + const TExternalModificationContext& context) const { + if (!NMetadata::NProvider::TServiceOperator::IsEnabled()) { + return TYqlConclusionStatus::Fail("metadata provider service is disabled"); + } + TInternalModificationContext internalContext(context); + internalContext.SetActivityType(EActivityType::Drop); + return DoPrepare(schemeOperation, settings, manager, internalContext); +} + } diff --git a/ydb/services/metadata/manager/abstract.h b/ydb/services/metadata/manager/abstract.h index b264dea182b6..0db55d5b7b92 100644 --- a/ydb/services/metadata/manager/abstract.h +++ b/ydb/services/metadata/manager/abstract.h @@ -2,6 +2,7 @@ #include "common.h" #include "table_record.h" +#include #include #include #include @@ -80,21 +81,43 @@ class IOperationsManager { YDB_ACCESSOR_DEF(std::optional, ActualSchema); protected: virtual NThreading::TFuture DoModify(const NYql::TObjectSettingsImpl& settings, const ui32 nodeId, - IClassBehaviour::TPtr manager, TInternalModificationContext& context) const = 0; + const IClassBehaviour::TPtr& manager, TInternalModificationContext& context) const = 0; + + virtual TYqlConclusionStatus DoPrepare(NKqpProto::TKqpSchemeOperation& schemeOperation, const NYql::TObjectSettingsImpl& settings, + const IClassBehaviour::TPtr& manager, TInternalModificationContext& context) const = 0; public: virtual ~IOperationsManager() = default; NThreading::TFuture UpsertObject(const NYql::TUpsertObjectSettings& settings, const ui32 nodeId, - IClassBehaviour::TPtr manager, const TExternalModificationContext& context) const; + const IClassBehaviour::TPtr& manager, const TExternalModificationContext& context) const; NThreading::TFuture CreateObject(const NYql::TCreateObjectSettings& settings, const ui32 nodeId, - IClassBehaviour::TPtr manager, const TExternalModificationContext& context) const; + const IClassBehaviour::TPtr& manager, const TExternalModificationContext& context) const; NThreading::TFuture AlterObject(const NYql::TAlterObjectSettings& settings, const ui32 nodeId, - IClassBehaviour::TPtr manager, const TExternalModificationContext& context) const; + const IClassBehaviour::TPtr& manager, const TExternalModificationContext& context) const; NThreading::TFuture DropObject(const NYql::TDropObjectSettings& settings, const ui32 nodeId, - IClassBehaviour::TPtr manager, const TExternalModificationContext& context) const; + const IClassBehaviour::TPtr& manager, const TExternalModificationContext& context) const; + + TYqlConclusionStatus PrepareUpsertObjectSchemeOperation(NKqpProto::TKqpSchemeOperation& schemeOperation, + const NYql::TUpsertObjectSettings& settings, const IClassBehaviour::TPtr& manager, + const TExternalModificationContext& context) const; + + TYqlConclusionStatus PrepareCreateObjectSchemeOperation(NKqpProto::TKqpSchemeOperation& schemeOperation, + const NYql::TCreateObjectSettings& settings, const IClassBehaviour::TPtr& manager, + const TExternalModificationContext& context) const; + + TYqlConclusionStatus PrepareAlterObjectSchemeOperation(NKqpProto::TKqpSchemeOperation& schemeOperation, + const NYql::TAlterObjectSettings& settings, const IClassBehaviour::TPtr& manager, + const TExternalModificationContext& context) const; + + TYqlConclusionStatus PrepareDropObjectSchemeOperation(NKqpProto::TKqpSchemeOperation& schemeOperation, + const NYql::TDropObjectSettings& settings, const IClassBehaviour::TPtr& manager, + const TExternalModificationContext& context) const; + + virtual NThreading::TFuture ExecutePrepared(const NKqpProto::TKqpSchemeOperation& schemeOperation, + const IClassBehaviour::TPtr& manager, const TExternalModificationContext& context) const = 0; const TTableSchema& GetSchema() const { Y_ABORT_UNLESS(!!ActualSchema); diff --git a/ydb/services/metadata/manager/alter_impl.h b/ydb/services/metadata/manager/alter_impl.h index c3aa3878732b..69fe03a72de3 100644 --- a/ydb/services/metadata/manager/alter_impl.h +++ b/ydb/services/metadata/manager/alter_impl.h @@ -66,7 +66,7 @@ class TModificationActorImpl: public NActors::TActorBootstrapped::TPtr manager, + const typename IObjectOperationsManager::TPtr& manager, const IOperationsManager::TInternalModificationContext& context) : ExternalController(controller) , Manager(manager) @@ -75,7 +75,7 @@ class TModificationActorImpl: public NActors::TActorBootstrapped::TPtr manager, + const typename IObjectOperationsManager::TPtr& manager, const IOperationsManager::TInternalModificationContext& context) : ExternalController(controller) , Manager(manager) @@ -84,7 +84,7 @@ class TModificationActorImpl: public NActors::TActorBootstrapped&& patches, IAlterController::TPtr controller, - typename IObjectOperationsManager::TPtr manager, + const typename IObjectOperationsManager::TPtr& manager, const IOperationsManager::TInternalModificationContext& context) : ExternalController(controller) , Manager(manager) @@ -94,7 +94,7 @@ class TModificationActorImpl: public NActors::TActorBootstrapped& patches, IAlterController::TPtr controller, - typename IObjectOperationsManager::TPtr manager, + const typename IObjectOperationsManager::TPtr& manager, const IOperationsManager::TInternalModificationContext& context) : ExternalController(controller) , Manager(manager) diff --git a/ydb/services/metadata/manager/generic_manager.h b/ydb/services/metadata/manager/generic_manager.h index b37aef78f111..51062e89ea75 100644 --- a/ydb/services/metadata/manager/generic_manager.h +++ b/ydb/services/metadata/manager/generic_manager.h @@ -33,7 +33,7 @@ class TGenericOperationsManager: public IObjectOperationsManager { protected: virtual NThreading::TFuture DoModify( const NYql::TObjectSettingsImpl& settings, const ui32 nodeId, - IClassBehaviour::TPtr manager, TInternalModificationContext& context) const override + const IClassBehaviour::TPtr& manager, TInternalModificationContext& context) const override { if (!manager) { return NThreading::MakeFuture(TYqlConclusionStatus::Fail("modification object behaviour not initialized")); diff --git a/ydb/services/metadata/secret/manager.cpp b/ydb/services/metadata/secret/manager.cpp index 8f4d28516c83..64e643b3dac9 100644 --- a/ydb/services/metadata/secret/manager.cpp +++ b/ydb/services/metadata/secret/manager.cpp @@ -46,6 +46,18 @@ NModifications::TOperationParsingResult TAccessManager::DoBuildPatchFromSettings return result; } +NMetadata::NModifications::IOperationsManager::TYqlConclusionStatus TAccessManager::DoPrepare(NKqpProto::TKqpSchemeOperation& /*schemeOperation*/, const NYql::TObjectSettingsImpl& /*settings*/, + const NMetadata::IClassBehaviour::TPtr& /*manager*/, NMetadata::NModifications::IOperationsManager::TInternalModificationContext& /*context*/) const { + return NMetadata::NModifications::IOperationsManager::TYqlConclusionStatus::Fail( + "Prepare operations for SECRET_ACCESS objects are not supported"); +} + +NThreading::TFuture TAccessManager::ExecutePrepared(const NKqpProto::TKqpSchemeOperation& /*schemeOperation*/, + const NMetadata::IClassBehaviour::TPtr& /*manager*/, const IOperationsManager::TExternalModificationContext& /*context*/) const { + return NThreading::MakeFuture(NMetadata::NModifications::IOperationsManager::TYqlConclusionStatus::Fail( + "Execution of prepare operations for SECRET_ACCESS objects is not supported")); +} + NModifications::TOperationParsingResult TSecretManager::DoBuildPatchFromSettings(const NYql::TObjectSettingsImpl& settings, TInternalModificationContext& context) const { static const TString ExtraPathSymbolsAllowed = "!\"#$%&'()*+,-.:;<=>?@[\\]^_`{|}~"; @@ -100,4 +112,16 @@ void TSecretManager::DoPrepareObjectsBeforeModification(std::vector&& p TActivationContext::Register(new TSecretPreparationActor(std::move(patchedObjects), controller, context)); } +NMetadata::NModifications::IOperationsManager::TYqlConclusionStatus TSecretManager::DoPrepare(NKqpProto::TKqpSchemeOperation& /*schemeOperation*/, const NYql::TObjectSettingsImpl& /*settings*/, + const NMetadata::IClassBehaviour::TPtr& /*manager*/, NMetadata::NModifications::IOperationsManager::TInternalModificationContext& /*context*/) const { + return NMetadata::NModifications::IOperationsManager::TYqlConclusionStatus::Fail( + "Prepare operations for SECRET objects are not supported"); +} + +NThreading::TFuture TSecretManager::ExecutePrepared(const NKqpProto::TKqpSchemeOperation& /*schemeOperation*/, + const NMetadata::IClassBehaviour::TPtr& /*manager*/, const IOperationsManager::TExternalModificationContext& /*context*/) const { + return NThreading::MakeFuture(NMetadata::NModifications::IOperationsManager::TYqlConclusionStatus::Fail( + "Execution of prepare operations for SECRET objects is not supported")); +} + } diff --git a/ydb/services/metadata/secret/manager.h b/ydb/services/metadata/secret/manager.h index 2ea535d901bd..5e0e4934b92e 100644 --- a/ydb/services/metadata/secret/manager.h +++ b/ydb/services/metadata/secret/manager.h @@ -15,6 +15,12 @@ class TSecretManager: public NModifications::TGenericOperationsManager virtual NModifications::TOperationParsingResult DoBuildPatchFromSettings( const NYql::TObjectSettingsImpl& settings, TInternalModificationContext& context) const override; + virtual IOperationsManager::TYqlConclusionStatus DoPrepare(NKqpProto::TKqpSchemeOperation& schemeOperation, const NYql::TObjectSettingsImpl& settings, + const NMetadata::IClassBehaviour::TPtr& manager, IOperationsManager::TInternalModificationContext& context) const override; + + virtual NThreading::TFuture ExecutePrepared(const NKqpProto::TKqpSchemeOperation& schemeOperation, + const NMetadata::IClassBehaviour::TPtr& manager, const IOperationsManager::TExternalModificationContext& context) const override; + public: }; @@ -26,7 +32,12 @@ class TAccessManager: public NModifications::TGenericOperationsManager virtual NModifications::TOperationParsingResult DoBuildPatchFromSettings(const NYql::TObjectSettingsImpl& settings, TInternalModificationContext& context) const override; -public: + + virtual IOperationsManager::TYqlConclusionStatus DoPrepare(NKqpProto::TKqpSchemeOperation& schemeOperation, const NYql::TObjectSettingsImpl& settings, + const NMetadata::IClassBehaviour::TPtr& manager, IOperationsManager::TInternalModificationContext& context) const override; + + virtual NThreading::TFuture ExecutePrepared(const NKqpProto::TKqpSchemeOperation& schemeOperation, + const NMetadata::IClassBehaviour::TPtr& manager, const IOperationsManager::TExternalModificationContext& context) const override; }; }