Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions ydb/core/kqp/common/events/workload_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,16 @@

namespace NKikimr::NKqp::NWorkload {

struct TEvSubscribeOnPoolChanges : public NActors::TEventLocal<TEvSubscribeOnPoolChanges, TKqpWorkloadServiceEvents::EvSubscribeOnPoolChanges> {
TEvSubscribeOnPoolChanges(const TString& database, const TString& poolId)
: Database(database)
, PoolId(poolId)
{}

const TString Database;
const TString PoolId;
};

struct TEvPlaceRequestIntoPool : public NActors::TEventLocal<TEvPlaceRequestIntoPool, TKqpWorkloadServiceEvents::EvPlaceRequestIntoPool> {
TEvPlaceRequestIntoPool(const TString& database, const TString& sessionId, const TString& poolId, TIntrusiveConstPtr<NACLib::TUserToken> userToken)
: Database(database)
Expand Down Expand Up @@ -80,4 +90,14 @@ struct TEvUpdatePoolInfo : public NActors::TEventLocal<TEvUpdatePoolInfo, TKqpWo
const std::optional<NACLib::TSecurityObject> SecurityObject;
};

struct TEvUpdateDatabaseInfo : public NActors::TEventLocal<TEvUpdateDatabaseInfo, TKqpWorkloadServiceEvents::EvUpdateDatabaseInfo> {
TEvUpdateDatabaseInfo(const TString& database, bool serverless)
: Database(database)
, Serverless(serverless)
{}

const TString Database;
const bool Serverless;
};

} // NKikimr::NKqp::NWorkload
2 changes: 2 additions & 0 deletions ydb/core/kqp/common/simple/kqp_event_ids.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,8 @@ struct TKqpWorkloadServiceEvents {
EvCleanupRequest,
EvCleanupResponse,
EvUpdatePoolInfo,
EvUpdateDatabaseInfo,
EvSubscribeOnPoolChanges,
};
};

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/compile_service/kqp_compile_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {

KqpHost = CreateKqpHost(Gateway, QueryId.Cluster, QueryId.Database, Config, ModuleResolverState->ModuleResolver,
FederatedQuerySetup, UserToken, GUCSettings, QueryServiceConfig, ApplicationName, AppData(ctx)->FunctionRegistry,
false, false, std::move(TempTablesState), nullptr, SplitCtx);
false, false, std::move(TempTablesState), nullptr, SplitCtx, UserRequestContext);

IKqpHost::TPrepareSettings prepareSettings;
prepareSettings.DocumentApiRestricted = QueryId.Settings.DocumentApiRestricted;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ NJson::TJsonValue TResourcePoolClassifierConfig::GetDebugJson() const {
}

bool TResourcePoolClassifierConfig::operator==(const TResourcePoolClassifierConfig& other) const {
return std::tie(Database, Name, Rank, ConfigJson) != std::tie(other.Database, other.Name, other.Rank, other.ConfigJson);
return std::tie(Database, Name, Rank, ConfigJson) == std::tie(other.Database, other.Name, other.Rank, other.ConfigJson);
}

NMetadata::IClassBehaviour::TPtr TResourcePoolClassifierConfig::GetBehaviour() {
Expand Down
9 changes: 5 additions & 4 deletions ydb/core/kqp/host/kqp_host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1033,7 +1033,8 @@ class TKqpHost : public IKqpHost {
std::optional<TKqpFederatedQuerySetup> federatedQuerySetup, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken,
const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry, bool keepConfigChanges, bool isInternalCall,
TKqpTempTablesState::TConstPtr tempTablesState = nullptr, NActors::TActorSystem* actorSystem = nullptr,
NYql::TExprContext* ctx = nullptr, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig = NKikimrConfig::TQueryServiceConfig())
NYql::TExprContext* ctx = nullptr, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig = NKikimrConfig::TQueryServiceConfig(),
const TIntrusivePtr<TUserRequestContext>& userRequestContext = nullptr)
: Gateway(gateway)
, Cluster(cluster)
, GUCSettings(gUCSettings)
Expand All @@ -1044,7 +1045,7 @@ class TKqpHost : public IKqpHost {
, KeepConfigChanges(keepConfigChanges)
, IsInternalCall(isInternalCall)
, FederatedQuerySetup(federatedQuerySetup)
, SessionCtx(new TKikimrSessionContext(funcRegistry, config, TAppData::TimeProvider, TAppData::RandomProvider, userToken))
, SessionCtx(new TKikimrSessionContext(funcRegistry, config, TAppData::TimeProvider, TAppData::RandomProvider, userToken, nullptr, userRequestContext))
, Config(config)
, TypesCtx(MakeIntrusive<TTypeAnnotationContext>())
, PlanBuilder(CreatePlanBuilder(*TypesCtx))
Expand Down Expand Up @@ -1958,10 +1959,10 @@ TIntrusivePtr<IKqpHost> CreateKqpHost(TIntrusivePtr<IKqpGateway> gateway, const
const TString& database, TKikimrConfiguration::TPtr config, IModuleResolver::TPtr moduleResolver,
std::optional<TKqpFederatedQuerySetup> federatedQuerySetup, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, const TGUCSettings::TPtr& gUCSettings,
const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, const TMaybe<TString>& applicationName, const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry, bool keepConfigChanges,
bool isInternalCall, TKqpTempTablesState::TConstPtr tempTablesState, NActors::TActorSystem* actorSystem, NYql::TExprContext* ctx)
bool isInternalCall, TKqpTempTablesState::TConstPtr tempTablesState, NActors::TActorSystem* actorSystem, NYql::TExprContext* ctx, const TIntrusivePtr<TUserRequestContext>& userRequestContext)
{
return MakeIntrusive<TKqpHost>(gateway, cluster, database, gUCSettings, applicationName, config, moduleResolver, federatedQuerySetup, userToken, funcRegistry,
keepConfigChanges, isInternalCall, std::move(tempTablesState), actorSystem, ctx, queryServiceConfig);
keepConfigChanges, isInternalCall, std::move(tempTablesState), actorSystem, ctx, queryServiceConfig, userRequestContext);
}

} // namespace NKqp
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/host/kqp_host.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ TIntrusivePtr<IKqpHost> CreateKqpHost(TIntrusivePtr<IKqpGateway> gateway,
const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, const TMaybe<TString>& applicationName = Nothing(), const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry = nullptr,
bool keepConfigChanges = false, bool isInternalCall = false, TKqpTempTablesState::TConstPtr tempTablesState = nullptr,
NActors::TActorSystem* actorSystem = nullptr /*take from TLS by default*/,
NYql::TExprContext* ctx = nullptr);
NYql::TExprContext* ctx = nullptr, const TIntrusivePtr<TUserRequestContext>& userRequestContext = nullptr);

} // namespace NKqp
} // namespace NKikimr
2 changes: 1 addition & 1 deletion ydb/core/kqp/host/kqp_runner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ class TKqpRunner : public IKqpRunner {
, Config(sessionCtx->ConfigPtr())
, TransformCtx(transformCtx)
, OptimizeCtx(MakeIntrusive<TKqpOptimizeContext>(cluster, Config, sessionCtx->QueryPtr(),
sessionCtx->TablesPtr()))
sessionCtx->TablesPtr(), sessionCtx->GetUserRequestContext()))
, BuildQueryCtx(MakeIntrusive<TKqpBuildQueryContext>())
, Pctx(TKqpProviderContext(*OptimizeCtx, Config->CostBasedOptimizationLevel.Get().GetOrElse(Config->DefaultCostBasedOptimizationLevel)))
{
Expand Down
5 changes: 4 additions & 1 deletion ydb/core/kqp/opt/kqp_opt.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@ namespace NKikimr::NKqp::NOpt {

struct TKqpOptimizeContext : public TSimpleRefCount<TKqpOptimizeContext> {
TKqpOptimizeContext(const TString& cluster, const NYql::TKikimrConfiguration::TPtr& config,
const TIntrusivePtr<NYql::TKikimrQueryContext> queryCtx, const TIntrusivePtr<NYql::TKikimrTablesData>& tables)
const TIntrusivePtr<NYql::TKikimrQueryContext> queryCtx, const TIntrusivePtr<NYql::TKikimrTablesData>& tables,
const TIntrusivePtr<NKikimr::NKqp::TUserRequestContext>& userRequestContext)
: Cluster(cluster)
, Config(config)
, QueryCtx(queryCtx)
, Tables(tables)
, UserRequestContext(userRequestContext)
{
YQL_ENSURE(QueryCtx);
YQL_ENSURE(Tables);
Expand All @@ -24,6 +26,7 @@ struct TKqpOptimizeContext : public TSimpleRefCount<TKqpOptimizeContext> {
const NYql::TKikimrConfiguration::TPtr Config;
const TIntrusivePtr<NYql::TKikimrQueryContext> QueryCtx;
const TIntrusivePtr<NYql::TKikimrTablesData> Tables;
const TIntrusivePtr<NKikimr::NKqp::TUserRequestContext> UserRequestContext;
int JoinsCount{};
int EquiJoinsCount{};
std::shared_ptr<NJson::TJsonValue> OverrideStatistics{};
Expand Down
19 changes: 16 additions & 3 deletions ydb/core/kqp/opt/kqp_query_plan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2384,11 +2384,21 @@ void PhyQuerySetTxPlans(NKqpProto::TKqpPhyQuery& queryProto, const TKqpPhysicalQ
txPlans.emplace_back(phyTx.GetPlan());
}

TString queryStats = "";
if (optCtx && optCtx->UserRequestContext && optCtx->UserRequestContext->PoolId) {
NJsonWriter::TBuf writer;
writer.BeginObject();
writer.WriteKey("ResourcePoolId").WriteString(optCtx->UserRequestContext->PoolId);
writer.EndObject();

queryStats = writer.Str();
}

NJsonWriter::TBuf writer;
writer.SetIndentSpaces(2);
WriteCommonTablesInfo(writer, serializerCtx.Tables);

queryProto.SetQueryPlan(SerializeTxPlans(txPlans, optCtx, writer.Str()));
queryProto.SetQueryPlan(SerializeTxPlans(txPlans, optCtx, writer.Str(), queryStats));
}

void FillAggrStat(NJson::TJsonValue& node, const NYql::NDqProto::TDqStatsAggr& aggr, const TString& name) {
Expand Down Expand Up @@ -2740,7 +2750,7 @@ TString AddExecStatsToTxPlan(const TString& txPlanJson, const NYql::NDqProto::TD
return AddExecStatsToTxPlan(txPlanJson, stats, TIntrusivePtr<NOpt::TKqpOptimizeContext>());
}

TString SerializeAnalyzePlan(const NKqpProto::TKqpStatsQuery& queryStats) {
TString SerializeAnalyzePlan(const NKqpProto::TKqpStatsQuery& queryStats, const TString& poolId) {
TVector<const TString> txPlans;
for (const auto& execStats: queryStats.GetExecutions()) {
for (const auto& txPlan: execStats.GetTxPlansWithStats()) {
Expand All @@ -2764,7 +2774,10 @@ TString SerializeAnalyzePlan(const NKqpProto::TKqpStatsQuery& queryStats) {

writer.WriteKey("ProcessCpuTimeUs").WriteLongLong(queryStats.GetWorkerCpuTimeUs());
writer.WriteKey("TotalDurationUs").WriteLongLong(queryStats.GetDurationUs());
writer.WriteKey("QueuedTimeUs").WriteLongLong(queryStats.GetQueuedTimeUs());
if (poolId) {
writer.WriteKey("QueuedTimeUs").WriteLongLong(queryStats.GetQueuedTimeUs());
writer.WriteKey("ResourcePoolId").WriteString(poolId);
}
writer.EndObject();

return SerializeTxPlans(txPlans, TIntrusivePtr<NOpt::TKqpOptimizeContext>(), "", writer.Str());
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/opt/kqp_query_plan.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ void PhyQuerySetTxPlans(NKqpProto::TKqpPhyQuery& queryProto, const NYql::NNodes:
*/
TString AddExecStatsToTxPlan(const TString& txPlan, const NYql::NDqProto::TDqExecutionStats& stats);

TString SerializeAnalyzePlan(const NKqpProto::TKqpStatsQuery& queryStats);
TString SerializeAnalyzePlan(const NKqpProto::TKqpStatsQuery& queryStats, const TString& poolId = "");

TString SerializeScriptPlan(const TVector<const TString>& queryPlans);

Expand Down
10 changes: 9 additions & 1 deletion ydb/core/kqp/provider/yql_kikimr_provider.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include <ydb/core/base/path.h>
#include <ydb/core/external_sources/external_source_factory.h>
#include <ydb/core/kqp/common/kqp_user_request_context.h>
#include <ydb/core/kqp/common/simple/temp_tables.h>
#include <ydb/core/kqp/query_data/kqp_query_data.h>
#include <ydb/library/yql/ast/yql_gc_nodes.h>
Expand Down Expand Up @@ -443,12 +444,14 @@ class TKikimrSessionContext : public TThrRefBase {
TIntrusivePtr<ITimeProvider> timeProvider,
TIntrusivePtr<IRandomProvider> randomProvider,
const TIntrusiveConstPtr<NACLib::TUserToken>& userToken,
TIntrusivePtr<TKikimrTransactionContextBase> txCtx = nullptr)
TIntrusivePtr<TKikimrTransactionContextBase> txCtx = nullptr,
const TIntrusivePtr<NKikimr::NKqp::TUserRequestContext>& userRequestContext = nullptr)
: Configuration(config)
, TablesData(MakeIntrusive<TKikimrTablesData>())
, QueryCtx(MakeIntrusive<TKikimrQueryContext>(functionRegistry, timeProvider, randomProvider))
, TxCtx(txCtx)
, UserToken(userToken)
, UserRequestContext(userRequestContext)
{}

TKikimrSessionContext(const TKikimrSessionContext&) = delete;
Expand Down Expand Up @@ -530,6 +533,10 @@ class TKikimrSessionContext : public TThrRefBase {
return UserToken;
}

const TIntrusivePtr<NKikimr::NKqp::TUserRequestContext>& GetUserRequestContext() const {
return UserRequestContext;
}

private:
TString UserName;
TString Cluster;
Expand All @@ -541,6 +548,7 @@ class TKikimrSessionContext : public TThrRefBase {
TIntrusivePtr<TKikimrTransactionContextBase> TxCtx;
NKikimr::NKqp::TKqpTempTablesState::TConstPtr TempTablesState;
TIntrusiveConstPtr<NACLib::TUserToken> UserToken;
TIntrusivePtr<NKikimr::NKqp::TUserRequestContext> UserRequestContext;
};

TIntrusivePtr<IDataProvider> CreateKikimrDataSource(
Expand Down
27 changes: 22 additions & 5 deletions ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
IEventHandle::FlagTrackDelivery);

WhiteBoardService = NNodeWhiteboard::MakeNodeWhiteboardServiceId(SelfId().NodeId());
ResourcePoolsCache.UpdateFeatureFlags(FeatureFlags, ActorContext());

if (auto& cfg = TableServiceConfig.GetSpillingServiceConfig().GetLocalFileConfig(); cfg.GetEnable()) {
TString spillingRoot = cfg.GetRoot();
Expand Down Expand Up @@ -482,6 +483,8 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
Send(TActivationContext::InterconnectProxy(node), new TEvents::TEvUnsubscribe);
});

ResourcePoolsCache.UnsubscribeFromResourcePoolClassifiers(ActorContext());

return TActor::PassAway();
}

Expand All @@ -499,6 +502,7 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
UpdateYqlLogLevels();

FeatureFlags.Swap(event.MutableConfig()->MutableFeatureFlags());
ResourcePoolsCache.UpdateFeatureFlags(FeatureFlags, ActorContext());

auto responseEv = MakeHolder<NConsole::TEvConsole::TEvConfigNotificationResponse>(event);
Send(ev->Sender, responseEv.Release(), IEventHandle::FlagTrackDelivery, ev->Cookie);
Expand Down Expand Up @@ -1353,6 +1357,8 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
hFunc(TEvKqp::TEvListSessionsRequest, Handle);
hFunc(TEvKqp::TEvListProxyNodesRequest, Handle);
hFunc(NWorkload::TEvUpdatePoolInfo, Handle);
hFunc(NWorkload::TEvUpdateDatabaseInfo, Handle);
hFunc(NMetadata::NProvider::TEvRefreshSubscriberData, Handle);
default:
Y_ABORT("TKqpProxyService: unexpected event type: %" PRIx32 " event: %s",
ev->GetTypeRewrite(), ev->ToString().data());
Expand Down Expand Up @@ -1561,23 +1567,26 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
}

bool TryFillPoolInfoFromCache(TEvKqp::TEvQueryRequest::TPtr& ev, ui64 requestId) {
if (!FeatureFlags.GetEnableResourcePools()) {
ResourcePoolsCache.UpdateFeatureFlags(FeatureFlags, ActorContext());

const auto& database = ev->Get()->GetDatabase();
if (!ResourcePoolsCache.ResourcePoolsEnabled(database)) {
ev->Get()->SetPoolId("");
return true;
}

const auto& userToken = ev->Get()->GetUserToken();
if (!ev->Get()->GetPoolId()) {
ev->Get()->SetPoolId(NResourcePool::DEFAULT_POOL_ID);
ev->Get()->SetPoolId(ResourcePoolsCache.GetPoolId(database, userToken, ActorContext()));
}

const auto& poolId = ev->Get()->GetPoolId();
const auto& poolInfo = ResourcePoolsCache.GetPoolInfo(ev->Get()->GetDatabase(), poolId);
const auto& poolInfo = ResourcePoolsCache.GetPoolInfo(database, poolId, ActorContext());
if (!poolInfo) {
return true;
}

const auto& securityObject = poolInfo->SecurityObject;
const auto& userToken = ev->Get()->GetUserToken();
if (securityObject && userToken && !userToken->GetSerializedToken().empty()) {
if (!securityObject->CheckAccess(NACLib::EAccessRights::DescribeSchema, *userToken)) {
ReplyProcessError(Ydb::StatusIds::NOT_FOUND, TStringBuilder() << "Resource pool " << poolId << " not found or you don't have access permissions", requestId);
Expand Down Expand Up @@ -1783,7 +1792,15 @@ class TKqpProxyService : public TActorBootstrapped<TKqpProxyService> {
}

void Handle(NWorkload::TEvUpdatePoolInfo::TPtr& ev) {
ResourcePoolsCache.UpdatePoolInfo(ev->Get()->Database, ev->Get()->PoolId, ev->Get()->Config, ev->Get()->SecurityObject);
ResourcePoolsCache.UpdatePoolInfo(ev->Get()->Database, ev->Get()->PoolId, ev->Get()->Config, ev->Get()->SecurityObject, ActorContext());
}

void Handle(NWorkload::TEvUpdateDatabaseInfo::TPtr& ev) {
ResourcePoolsCache.UpdateDatabaseInfo(ev->Get()->Database, ev->Get()->Serverless);
}

void Handle(NMetadata::NProvider::TEvRefreshSubscriberData::TPtr& ev) {
ResourcePoolsCache.UpdateResourcePoolClassifiersInfo(ev->Get()->GetSnapshotAs<TResourcePoolClassifierSnapshot>(), ActorContext());
}

private:
Expand Down
Loading