Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 92 additions & 15 deletions ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <ydb/core/tx/tx_proxy/proxy.h>
#include <ydb/core/kqp/session_actor/kqp_worker_common.h>
#include <ydb/core/tx/schemeshard/schemeshard_build_index.h>
#include <ydb/services/metadata/abstract/kqp_common.h>

namespace NKikimr::NKqp {

Expand Down Expand Up @@ -49,7 +50,7 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
return NKikimrServices::TActivity::KQP_EXECUTER_ACTOR;
}

TKqpSchemeExecuter(TKqpPhyTxHolder::TConstPtr phyTx, NKikimrKqp::EQueryType queryType, const TActorId& target, const TMaybe<TString>& requestType,
TKqpSchemeExecuter(TKqpPhyTxHolder::TConstPtr phyTx, NKikimrKqp::EQueryType queryType, const TActorId& target, const TMaybe<TString>& requestType,
const TString& database, TIntrusiveConstPtr<NACLib::TUserToken> userToken,
bool temporary, TString sessionId, TIntrusivePtr<TUserRequestContext> ctx)
: PhyTx(phyTx)
Expand All @@ -69,13 +70,11 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
}

void StartBuildOperation() {
const auto& schemeOp = PhyTx->GetSchemeOperation();
auto buildOp = schemeOp.GetBuildOperation();
Send(MakeTxProxyID(), new TEvTxUserProxy::TEvAllocateTxId);
Become(&TKqpSchemeExecuter::ExecuteState);
Become(&TKqpSchemeExecuter::ExecuteState);
}

void Bootstrap() {
void MakeSchemeOperationRequest() {
using TRequest = TEvTxUserProxy::TEvProposeTransaction;

auto ev = MakeHolder<TRequest>();
Expand Down Expand Up @@ -124,30 +123,44 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
}

case NKqpProto::TKqpSchemeOperation::kAlterTable: {
auto modifyScheme = schemeOp.GetAlterTable();
const auto& modifyScheme = schemeOp.GetAlterTable();
ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme);
break;
}

case NKqpProto::TKqpSchemeOperation::kBuildOperation: {
auto buildOp = schemeOp.GetBuildOperation();
return StartBuildOperation();
}

case NKqpProto::TKqpSchemeOperation::kCreateUser: {
auto modifyScheme = schemeOp.GetCreateUser();
const auto& modifyScheme = schemeOp.GetCreateUser();
ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme);
break;
}

case NKqpProto::TKqpSchemeOperation::kAlterUser: {
auto modifyScheme = schemeOp.GetAlterUser();
const auto& modifyScheme = schemeOp.GetAlterUser();
ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme);
break;
}

case NKqpProto::TKqpSchemeOperation::kDropUser: {
auto modifyScheme = schemeOp.GetDropUser();
const auto& modifyScheme = schemeOp.GetDropUser();
ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme);
break;
}
case NKqpProto::TKqpSchemeOperation::kCreateExternalTable: {
const auto& modifyScheme = schemeOp.GetCreateExternalTable();
ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme);
break;
}
case NKqpProto::TKqpSchemeOperation::kAlterExternalTable: {
const auto& modifyScheme = schemeOp.GetAlterExternalTable();
ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme);
break;
}
case NKqpProto::TKqpSchemeOperation::kDropExternalTable: {
const auto& modifyScheme = schemeOp.GetDropExternalTable();
ev->Record.MutableTransaction()->MutableModifyScheme()->CopyFrom(modifyScheme);
break;
}
Expand Down Expand Up @@ -191,7 +204,7 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
auto promise = NewPromise<IKqpGateway::TGenericResult>();

bool successOnNotExist = false;
bool failedOnAlreadyExists = false;
bool failedOnAlreadyExists = false;
// exists/not exists semantics supported only in the query service.
if (IsQueryService()) {
successOnNotExist = ev->Record.GetTransaction().GetModifyScheme().GetSuccessOnNotExist();
Expand All @@ -218,6 +231,57 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
Become(&TKqpSchemeExecuter::ExecuteState);
}

void MakeObjectRequest() {
const auto& schemeOp = PhyTx->GetSchemeOperation();
NMetadata::IClassBehaviour::TPtr cBehaviour(NMetadata::IClassBehaviour::TFactory::Construct(schemeOp.GetObjectType()));
if (!cBehaviour) {
InternalError(TStringBuilder() << "Unsupported object type: \"" << schemeOp.GetObjectType() << "\"");
return;
}

if (!cBehaviour->GetOperationsManager()) {
InternalError(TStringBuilder() << "Object type \"" << schemeOp.GetObjectType() << "\" does not have manager for operations");
}

auto* actorSystem = TActivationContext::ActorSystem();
auto selfId = SelfId();

NMetadata::NModifications::IOperationsManager::TExternalModificationContext context;
context.SetDatabase(Database);
context.SetActorSystem(actorSystem);
if (UserToken) {
context.SetUserToken(*UserToken);
}

auto resultFuture = cBehaviour->GetOperationsManager()->ExecutePrepared(schemeOp, cBehaviour, context);

using TResultFuture = NThreading::TFuture<NMetadata::NModifications::IOperationsManager::TYqlConclusionStatus>;
resultFuture.Subscribe([actorSystem, selfId](const TResultFuture& f) {
const auto& status = f.GetValue();
auto ev = MakeHolder<TEvPrivate::TEvResult>();
if (status.Ok()) {
ev->Result.SetSuccess();
} else {
ev->Result.SetStatus(status.GetStatus());
if (TString message = status.GetErrorMessage()) {
ev->Result.AddIssue(NYql::TIssue{message});
}
}
actorSystem->Send(selfId, ev.Release());
});

Become(&TKqpSchemeExecuter::ObjectExecuteState);
}

void Bootstrap() {
const auto& schemeOp = PhyTx->GetSchemeOperation();
if (schemeOp.GetObjectType()) {
MakeObjectRequest();
} else {
MakeSchemeOperationRequest();
}
}

public:
STATEFN(ExecuteState) {
try {
Expand All @@ -240,6 +304,19 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
}
}

STATEFN(ObjectExecuteState) {
try {
switch (ev->GetTypeRewrite()) {
hFunc(TEvPrivate::TEvResult, HandleExecute);
hFunc(TEvKqp::TEvAbortExecution, HandleAbortExecution);
default:
UnexpectedEvent("ObjectExecuteState", ev->GetTypeRewrite());
}
} catch (const yexception& e) {
InternalError(e.what());
}
}


void Handle(TEvTxUserProxy::TEvAllocateTxIdResult::TPtr& ev) {
const auto* msg = ev->Get();
Expand All @@ -250,18 +327,18 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {

void Navigate(const TActorId& schemeCache) {
const auto& schemeOp = PhyTx->GetSchemeOperation();
auto buildOp = schemeOp.GetBuildOperation();
const auto& buildOp = schemeOp.GetBuildOperation();
const auto& path = buildOp.source_path();

const auto paths = NKikimr::SplitPath(path);
if (paths.empty()) {
TString error = TStringBuilder() << "Failed to split table path " << path;
return ReplyErrorAndDie(Ydb::StatusIds::BAD_REQUEST, NYql::TIssue(error));
}

auto request = std::make_unique<NSchemeCache::TSchemeCacheNavigate>();

request->DatabaseName = Database;
request->DatabaseName = Database;
auto& entry = request->ResultSet.emplace_back();
entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpPath;
entry.Path = ::NKikimr::SplitPath(path);
Expand Down Expand Up @@ -312,7 +389,7 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
}

const auto& schemeOp = PhyTx->GetSchemeOperation();
auto buildOp = schemeOp.GetBuildOperation();
const auto& buildOp = schemeOp.GetBuildOperation();
SetSchemeShardId(domainInfo->ExtractSchemeShard());
auto req = std::make_unique<NSchemeShard::TEvIndexBuilder::TEvCreateRequest>(TxId, Database, buildOp);
ForwardToSchemeShard(std::move(req));
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/kqp/executer_actor/ut/kqp_executer_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,15 @@ namespace {

[[maybe_unused]]
NKqpProto::TKqpPhyTx BuildTxPlan(const TString& sql, TIntrusivePtr<IKqpGateway> gateway,
const TKikimrConfiguration::TPtr& config)
const TKikimrConfiguration::TPtr& config, NActors::TActorId* actorSystem)
{
auto cluster = TString(DefaultKikimrClusterName);

TExprContext moduleCtx;
IModuleResolver::TPtr moduleResolver;
UNIT_ASSERT(GetYqlDefaultModuleResolver(moduleCtx, moduleResolver));

auto qp = CreateKqpHost(gateway, cluster, "/Root", config, moduleResolver, NYql::IHTTPGateway::Make());
auto qp = CreateKqpHost(gateway, cluster, "/Root", config, moduleResolver, NYql::IHTTPGateway::Make(), nullptr, false, false, nullptr, actorSystem);
auto result = qp->SyncPrepareDataQuery(sql, IKqpHost::TPrepareSettings());
result.Issues().PrintTo(Cerr);
UNIT_ASSERT(result.Success());
Expand Down Expand Up @@ -97,7 +97,7 @@ Y_UNIT_TEST_SUITE(KqpExecuter) {

UPSERT INTO [Root/EightShard]
SELECT * FROM $itemsSource;
)", gateway, ctx);
)", gateway, ctx, kikimr.GetTestServer().GetRuntime()->GetAnyNodeActorSystem());

LogTxPlan(kikimr, tx);

Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/executer_actor/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ PEERDIR(
ydb/core/protos
ydb/core/tx/long_tx_service/public
ydb/core/ydb_convert
ydb/services/metadata/abstract
ydb/library/mkql_proto
ydb/library/mkql_proto/protos
ydb/library/yql/dq/actors/compute
Expand Down
Loading