Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ydb/core/kqp/compile_service/kqp_compile_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
Config->FeatureFlags = AppData(ctx)->FeatureFlags;

KqpHost = CreateKqpHost(Gateway, QueryId.Cluster, QueryId.Database, Config, ModuleResolverState->ModuleResolver,
FederatedQuerySetup, AppData(ctx)->FunctionRegistry, false, false, std::move(TempTablesState));
FederatedQuerySetup, UserToken, AppData(ctx)->FunctionRegistry, false, false, std::move(TempTablesState));

IKqpHost::TPrepareSettings prepareSettings;
prepareSettings.DocumentApiRestricted = QueryId.Settings.DocumentApiRestricted;
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/executer_actor/kqp_scheme_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ class TKqpSchemeExecuter : public TActorBootstrapped<TKqpSchemeExecuter> {
context.SetUserToken(*UserToken);
}

auto resultFuture = cBehaviour->GetOperationsManager()->ExecutePrepared(schemeOp, cBehaviour, context);
auto resultFuture = cBehaviour->GetOperationsManager()->ExecutePrepared(schemeOp, SelfId().NodeId(), cBehaviour, context);

using TResultFuture = NThreading::TFuture<NMetadata::NModifications::IOperationsManager::TYqlConclusionStatus>;
resultFuture.Subscribe([actorSystem, selfId](const TResultFuture& f) {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/executer_actor/ut/kqp_executer_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ NKqpProto::TKqpPhyTx BuildTxPlan(const TString& sql, TIntrusivePtr<IKqpGateway>
IModuleResolver::TPtr moduleResolver;
UNIT_ASSERT(GetYqlDefaultModuleResolver(moduleCtx, moduleResolver));

auto qp = CreateKqpHost(gateway, cluster, "/Root", config, moduleResolver, NYql::IHTTPGateway::Make(), nullptr, false, false, nullptr, actorSystem);
auto qp = CreateKqpHost(gateway, cluster, "/Root", config, moduleResolver, NYql::IHTTPGateway::Make(), nullptr, nullptr, false, false, nullptr, actorSystem);
auto result = qp->SyncPrepareDataQuery(sql, IKqpHost::TPrepareSettings());
result.Issues().PrintTo(Cerr);
UNIT_ASSERT(result.Success());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ void TExternalDataSourceManager::PrepareDropExternalDataSource(NKqpProto::TKqpSc
}

NThreading::TFuture<NMetadata::NModifications::IOperationsManager::TYqlConclusionStatus> TExternalDataSourceManager::ExecutePrepared(const NKqpProto::TKqpSchemeOperation& schemeOperation,
const NMetadata::IClassBehaviour::TPtr& /*manager*/, const IOperationsManager::TExternalModificationContext& context) const {
const ui32 /*nodeId*/, const NMetadata::IClassBehaviour::TPtr& /*manager*/, const IOperationsManager::TExternalModificationContext& context) const {
using TRequest = TEvTxUserProxy::TEvProposeTransaction;

auto ev = MakeHolder<TRequest>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class TExternalDataSourceManager: public NMetadata::NModifications::IOperationsM
const NMetadata::IClassBehaviour::TPtr& manager, IOperationsManager::TInternalModificationContext& context) const override;

NThreading::TFuture<IOperationsManager::TYqlConclusionStatus> ExecutePrepared(const NKqpProto::TKqpSchemeOperation& schemeOperation,
const NMetadata::IClassBehaviour::TPtr& manager, const IOperationsManager::TExternalModificationContext& context) const override;
const ui32 nodeId, const NMetadata::IClassBehaviour::TPtr& manager, const IOperationsManager::TExternalModificationContext& context) const override;

public:
using NMetadata::NModifications::IOperationsManager::TYqlConclusionStatus;
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/gateway/behaviour/tablestore/manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ NMetadata::NModifications::IOperationsManager::TYqlConclusionStatus TTableStoreM
}

NThreading::TFuture<NMetadata::NModifications::IOperationsManager::TYqlConclusionStatus> TTableStoreManager::ExecutePrepared(const NKqpProto::TKqpSchemeOperation& /*schemeOperation*/,
const NMetadata::IClassBehaviour::TPtr& /*manager*/, const IOperationsManager::TExternalModificationContext& /*context*/) const {
const ui32 /*nodeId*/, const NMetadata::IClassBehaviour::TPtr& /*manager*/, const IOperationsManager::TExternalModificationContext& /*context*/) const {
return NThreading::MakeFuture(NMetadata::NModifications::IOperationsManager::TYqlConclusionStatus::Fail(
"Execution of prepare operations for TABLE objects is not supported"));
}
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/gateway/behaviour/tablestore/manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class TTableStoreManager: public NMetadata::NModifications::IOperationsManager {
const NMetadata::IClassBehaviour::TPtr& manager, IOperationsManager::TInternalModificationContext& context) const override;

NThreading::TFuture<IOperationsManager::TYqlConclusionStatus> ExecutePrepared(const NKqpProto::TKqpSchemeOperation& schemeOperation,
const NMetadata::IClassBehaviour::TPtr& manager, const IOperationsManager::TExternalModificationContext& context) const override;
const ui32 nodeId, const NMetadata::IClassBehaviour::TPtr& manager, const IOperationsManager::TExternalModificationContext& context) const override;
public:
TTableStoreManager(bool isStandalone)
: IsStandalone(isStandalone)
Expand Down
5 changes: 3 additions & 2 deletions ydb/core/kqp/gateway/behaviour/view/manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ TViewManager::TYqlConclusionStatus TViewManager::DoPrepare(NKqpProto::TKqpScheme
const NMetadata::IClassBehaviour::TPtr& manager,
TInternalModificationContext& context) const {
Y_UNUSED(manager);

try {
CheckFeatureFlag(context);
switch (context.GetActivityType()) {
Expand All @@ -176,9 +176,10 @@ TViewManager::TYqlConclusionStatus TViewManager::DoPrepare(NKqpProto::TKqpScheme
}

NThreading::TFuture<TYqlConclusionStatus> TViewManager::ExecutePrepared(const NKqpProto::TKqpSchemeOperation& schemeOperation,
const ui32 nodeId,
const NMetadata::IClassBehaviour::TPtr& manager,
const TExternalModificationContext& context) const {
Y_UNUSED(manager);
Y_UNUSED(manager, nodeId);

auto proposal = MakeHolder<TEvTxUserProxy::TEvProposeTransaction>();
proposal->Record.SetDatabaseName(context.GetDatabase());
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/gateway/behaviour/view/manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ class TViewManager: public NMetadata::NModifications::IOperationsManager {
TInternalModificationContext& context) const override;

NThreading::TFuture<TYqlConclusionStatus> ExecutePrepared(const NKqpProto::TKqpSchemeOperation& schemeOperation,
const ui32 nodeId,
const NMetadata::IClassBehaviour::TPtr& manager,
const TExternalModificationContext& context) const override;

Expand Down
3 changes: 3 additions & 0 deletions ydb/core/kqp/host/kqp_gateway_proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1022,6 +1022,9 @@ class TKqpGatewayProxy : public IKikimrGateway {
NMetadata::NModifications::IOperationsManager::TExternalModificationContext context;
context.SetDatabase(SessionCtx->GetDatabase());
context.SetActorSystem(ActorSystem);
if (SessionCtx->GetUserToken()) {
context.SetUserToken(*SessionCtx->GetUserToken());
}

auto& phyTx = phyTxRemover.Capture(SessionCtx->Query().PreparingQuery->MutablePhysicalQuery());
phyTx.SetType(NKqpProto::TKqpPhyTx::TYPE_SCHEME);
Expand Down
8 changes: 4 additions & 4 deletions ydb/core/kqp/host/kqp_host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -921,7 +921,7 @@ class TKqpHost : public IKqpHost {
public:
TKqpHost(TIntrusivePtr<IKqpGateway> gateway, const TString& cluster, const TString& database,
TKikimrConfiguration::TPtr config, IModuleResolver::TPtr moduleResolver,
std::optional<TKqpFederatedQuerySetup> federatedQuerySetup,
std::optional<TKqpFederatedQuerySetup> federatedQuerySetup, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken,
const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry, bool keepConfigChanges,
bool isInternalCall, TKqpTempTablesState::TConstPtr tempTablesState = nullptr,
NActors::TActorSystem* actorSystem = nullptr)
Expand All @@ -932,7 +932,7 @@ class TKqpHost : public IKqpHost {
, KeepConfigChanges(keepConfigChanges)
, IsInternalCall(isInternalCall)
, FederatedQuerySetup(federatedQuerySetup)
, SessionCtx(new TKikimrSessionContext(funcRegistry, config, TAppData::TimeProvider, TAppData::RandomProvider))
, SessionCtx(new TKikimrSessionContext(funcRegistry, config, TAppData::TimeProvider, TAppData::RandomProvider, userToken))
, TypesCtx(MakeIntrusive<TTypeAnnotationContext>())
, PlanBuilder(CreatePlanBuilder(*TypesCtx))
, FakeWorld(ExprCtx->NewWorld(TPosition()))
Expand Down Expand Up @@ -1672,11 +1672,11 @@ Ydb::Table::QueryStatsCollection::Mode GetStatsMode(NYql::EKikimrStatsMode stats

TIntrusivePtr<IKqpHost> CreateKqpHost(TIntrusivePtr<IKqpGateway> gateway,
const TString& cluster, const TString& database, TKikimrConfiguration::TPtr config, IModuleResolver::TPtr moduleResolver,
std::optional<TKqpFederatedQuerySetup> federatedQuerySetup,
std::optional<TKqpFederatedQuerySetup> federatedQuerySetup, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken,
const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry, bool keepConfigChanges, bool isInternalCall,
TKqpTempTablesState::TConstPtr tempTablesState, NActors::TActorSystem* actorSystem)
{
return MakeIntrusive<TKqpHost>(gateway, cluster, database, config, moduleResolver, federatedQuerySetup, funcRegistry,
return MakeIntrusive<TKqpHost>(gateway, cluster, database, config, moduleResolver, federatedQuerySetup, userToken, funcRegistry,
keepConfigChanges, isInternalCall, std::move(tempTablesState), actorSystem);
}

Expand Down
3 changes: 2 additions & 1 deletion ydb/core/kqp/host/kqp_host.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <ydb/core/kqp/gateway/kqp_gateway.h>
#include <ydb/core/kqp/provider/yql_kikimr_provider.h>
#include <ydb/core/kqp/host/kqp_translate.h>
#include <ydb/library/aclib/aclib.h>
#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
#include <ydb/library/yql/providers/common/token_accessor/client/factory.h>

Expand Down Expand Up @@ -108,7 +109,7 @@ class IKqpHost : public TThrRefBase {

TIntrusivePtr<IKqpHost> CreateKqpHost(TIntrusivePtr<IKqpGateway> gateway,
const TString& cluster, const TString& database, NYql::TKikimrConfiguration::TPtr config, NYql::IModuleResolver::TPtr moduleResolver,
std::optional<TKqpFederatedQuerySetup> federatedQuerySetup, const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry = nullptr,
std::optional<TKqpFederatedQuerySetup> federatedQuerySetup, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry = nullptr,
bool keepConfigChanges = false, bool isInternalCall = false, TKqpTempTablesState::TConstPtr tempTablesState = nullptr,
NActors::TActorSystem* actorSystem = nullptr /*take from TLS by default*/);

Expand Down
10 changes: 9 additions & 1 deletion ydb/core/kqp/provider/yql_kikimr_provider.h
Original file line number Diff line number Diff line change
Expand Up @@ -445,11 +445,14 @@ class TKikimrSessionContext : public TThrRefBase {
TKikimrConfiguration::TPtr config,
TIntrusivePtr<ITimeProvider> timeProvider,
TIntrusivePtr<IRandomProvider> randomProvider,
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken,
TIntrusivePtr<TKikimrTransactionContextBase> txCtx = nullptr)
: Configuration(config)
, TablesData(MakeIntrusive<TKikimrTablesData>())
, QueryCtx(MakeIntrusive<TKikimrQueryContext>(functionRegistry, timeProvider, randomProvider))
, TxCtx(txCtx) {}
, TxCtx(txCtx)
, UserToken(userToken)
{}

TKikimrSessionContext(const TKikimrSessionContext&) = delete;
TKikimrSessionContext& operator=(const TKikimrSessionContext&) = delete;
Expand Down Expand Up @@ -518,6 +521,10 @@ class TKikimrSessionContext : public TThrRefBase {
TempTablesState = tempTablesState;
}

const TIntrusiveConstPtr<NACLib::TUserToken>& GetUserToken() const {
return UserToken;
}

private:
TString UserName;
TString Cluster;
Expand All @@ -527,6 +534,7 @@ class TKikimrSessionContext : public TThrRefBase {
TIntrusivePtr<TKikimrQueryContext> QueryCtx;
TIntrusivePtr<TKikimrTransactionContextBase> TxCtx;
NKikimr::NKqp::TKqpTempTablesState::TConstPtr TempTablesState;
TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
};

TIntrusivePtr<IDataProvider> CreateKikimrDataSource(
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/session_actor/kqp_worker_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ class TKqpWorkerActor : public TActorBootstrapped<TKqpWorkerActor> {
Config->FeatureFlags = AppData(ctx)->FeatureFlags;

KqpHost = CreateKqpHost(Gateway, Settings.Cluster, Settings.Database, Config, ModuleResolverState->ModuleResolver,
FederatedQuerySetup, AppData(ctx)->FunctionRegistry, !Settings.LongSession, false);
FederatedQuerySetup, QueryState->RequestEv->GetUserToken(), AppData(ctx)->FunctionRegistry, !Settings.LongSession, false);

auto& queryRequest = QueryState->RequestEv;
QueryState->ProxyRequestId = proxyRequestId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,7 @@ Y_UNIT_TEST_SUITE(FederatedQueryJoin) {
}

std::shared_ptr<NKikimr::NKqp::TKikimrRunner> kikimr = MakeKikimrRunnerWithConnector();

auto tableCLient = kikimr->GetTableClient();
auto session = tableCLient.CreateSession().GetValueSync().GetSession();
auto queryClient = kikimr->GetQueryClient();

// external tables to pg/ch
{
Expand Down Expand Up @@ -104,7 +102,7 @@ Y_UNIT_TEST_SUITE(FederatedQueryJoin) {
"ch_database"_a = GetChDatabase(),
"ch_user"_a = GetChUser(),
"ch_password"_a = GetChPassword());
auto result = session.ExecuteSchemeQuery(sql).GetValueSync();
auto result = queryClient.ExecuteQuery(sql, NYdb::NQuery::TTxControl::NoTx()).GetValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}

Expand All @@ -116,7 +114,6 @@ Y_UNIT_TEST_SUITE(FederatedQueryJoin) {
WHERE ch.key > 998
)sql";

auto queryClient = kikimr->GetQueryClient();
auto result = queryClient.ExecuteQuery(sql, NYdb::NQuery::TTxControl::BeginTx().CommitTx()).GetValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ using namespace NTestUtils;
using namespace fmt::literals;

Y_UNIT_TEST_SUITE(KqpFederatedSchemeTest) {
Y_UNIT_TEST(CreateExternalTable) {
Y_UNIT_TEST(ExternalTableDdl) {
enum EEx {
Empty,
IfExists,
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/ut/indexes/kqp_indexes_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ TIntrusivePtr<IKqpHost> CreateKikimrQueryProcessor(TIntrusivePtr<IKqpGateway> ga

auto federatedQuerySetup = std::make_optional<TKqpFederatedQuerySetup>({NYql::IHTTPGateway::Make(), nullptr, nullptr, nullptr, {}, {}});
return NKqp::CreateKqpHost(gateway, cluster, "/Root", kikimrConfig, moduleResolver,
federatedQuerySetup, funcRegistry, funcRegistry, keepConfigChanges, nullptr, actorSystem);
federatedQuerySetup, nullptr, funcRegistry, funcRegistry, keepConfigChanges, nullptr, actorSystem);
}

NYql::NNodes::TExprBase GetExpr(const TString& ast, NYql::TExprContext& ctx, NYql::IModuleResolver* moduleResolver) {
Expand Down
92 changes: 92 additions & 0 deletions ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1160,6 +1160,98 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
});
}

Y_UNIT_TEST(DdlSecret) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true);
auto setting = NKikimrKqp::TKqpSetting();
auto serverSettings = TKikimrSettings()
.SetAppConfig(appConfig)
.SetKqpSettings({setting});

TKikimrRunner kikimr(serverSettings);
auto db = kikimr.GetQueryClient();

enum EEx {
Empty,
IfExists,
IfNotExists,
};

auto executeSql = [&](const TString& sql, bool expectSuccess) {
Cerr << "Execute SQL:\n" << sql << Endl;

auto result = db.ExecuteQuery(sql, TTxControl::NoTx()).ExtractValueSync();
if (expectSuccess) {
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
} else {
UNIT_ASSERT_VALUES_UNEQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
}
UNIT_ASSERT(result.GetResultSets().empty());
};

auto checkCreate = [&](bool expectSuccess, EEx exMode, int nameSuffix) {
UNIT_ASSERT_UNEQUAL(exMode, EEx::IfExists);
const TString ifNotExistsStatement = exMode == EEx::IfNotExists ? "IF NOT EXISTS" : "";
const TString sql = fmt::format(R"sql(
CREATE OBJECT {if_not_exists} my_secret_{name_suffix} (TYPE SECRET) WITH (value="qwerty");
)sql",
"if_not_exists"_a = ifNotExistsStatement,
"name_suffix"_a = nameSuffix
);

executeSql(sql, expectSuccess);
};

auto checkAlter = [&](bool expectSuccess, int nameSuffix) {
const TString sql = fmt::format(R"sql(
ALTER OBJECT my_secret_{name_suffix} (TYPE SECRET) SET value = "abcde";
)sql",
"name_suffix"_a = nameSuffix
);

executeSql(sql, expectSuccess);
};

auto checkUpsert = [&](bool expectSuccess, int nameSuffix) {
const TString sql = fmt::format(R"sql(
UPSERT OBJECT my_secret_{name_suffix} (TYPE SECRET) WITH value = "edcba";
)sql",
"name_suffix"_a = nameSuffix
);

executeSql(sql, expectSuccess);
};

auto checkDrop = [&](bool expectSuccess, EEx exMode, int nameSuffix) {
UNIT_ASSERT_UNEQUAL(exMode, EEx::IfNotExists);
const TString ifExistsStatement = exMode == EEx::IfExists ? "IF EXISTS" : "";
const TString sql = fmt::format(R"sql(
DROP OBJECT {if_exists} my_secret_{name_suffix} (TYPE SECRET);
)sql",
"if_exists"_a = ifExistsStatement,
"name_suffix"_a = nameSuffix
);

executeSql(sql, expectSuccess);
};

checkCreate(true, EEx::Empty, 0);
checkCreate(false, EEx::Empty, 0);
checkAlter(true, 0);
checkAlter(false, 2); // not exists
checkDrop(true, EEx::Empty, 0);
checkDrop(true, EEx::Empty, 0); // we don't check object existence

checkCreate(true, EEx::IfNotExists, 1);
checkCreate(true, EEx::IfNotExists, 1);
checkDrop(true, EEx::IfExists, 1);
checkDrop(true, EEx::IfExists, 1);

checkUpsert(true, 2);
checkCreate(false, EEx::Empty, 2); // already exists
checkUpsert(true, 2);
}

Y_UNIT_TEST(DdlCache) {
NKikimrConfig::TAppConfig appConfig;
appConfig.MutableTableServiceConfig()->SetEnablePreparedDdl(true);
Expand Down
15 changes: 15 additions & 0 deletions ydb/core/protos/kqp_physical.proto
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,17 @@ message TKqpPhyResult {
optional uint32 QueryResultIndex = 5;
}

message TKqpPhyMetadataOperation {
message TColumnValue {
string Column = 1;
Ydb.Value Value = 2;
}
repeated TColumnValue ColumnValues = 1;

optional bool SuccessOnNotExist = 2;
optional bool SuccessOnAlreadyExists = 3;
}

message TKqpSchemeOperation {
enum EFlags {
FLAG_UNSPECIFIED = 0;
Expand Down Expand Up @@ -409,6 +420,10 @@ message TKqpSchemeOperation {
NKikimrSchemeOp.TModifyScheme CreateView = 20;
NKikimrSchemeOp.TModifyScheme AlterView = 21;
NKikimrSchemeOp.TModifyScheme DropView = 22;
TKqpPhyMetadataOperation CreateObject = 23;
TKqpPhyMetadataOperation UpsertObject = 24;
TKqpPhyMetadataOperation AlterObject = 25;
TKqpPhyMetadataOperation DropObject = 26;
}
}

Expand Down
Loading