diff --git a/ydb/core/kqp/common/events/workload_service.h b/ydb/core/kqp/common/events/workload_service.h index 514579bfcb24..c1d36a957a76 100644 --- a/ydb/core/kqp/common/events/workload_service.h +++ b/ydb/core/kqp/common/events/workload_service.h @@ -12,6 +12,16 @@ namespace NKikimr::NKqp::NWorkload { +struct TEvSubscribeOnPoolChanges : public NActors::TEventLocal { + TEvSubscribeOnPoolChanges(const TString& database, const TString& poolId) + : Database(database) + , PoolId(poolId) + {} + + const TString Database; + const TString PoolId; +}; + struct TEvPlaceRequestIntoPool : public NActors::TEventLocal { TEvPlaceRequestIntoPool(const TString& database, const TString& sessionId, const TString& poolId, TIntrusiveConstPtr userToken) : Database(database) @@ -80,4 +90,14 @@ struct TEvUpdatePoolInfo : public NActors::TEventLocal SecurityObject; }; +struct TEvUpdateDatabaseInfo : public NActors::TEventLocal { + TEvUpdateDatabaseInfo(const TString& database, bool serverless) + : Database(database) + , Serverless(serverless) + {} + + const TString Database; + const bool Serverless; +}; + } // NKikimr::NKqp::NWorkload diff --git a/ydb/core/kqp/common/simple/kqp_event_ids.h b/ydb/core/kqp/common/simple/kqp_event_ids.h index b0002f332bd2..8c4b3fcab29a 100644 --- a/ydb/core/kqp/common/simple/kqp_event_ids.h +++ b/ydb/core/kqp/common/simple/kqp_event_ids.h @@ -175,6 +175,8 @@ struct TKqpWorkloadServiceEvents { EvCleanupRequest, EvCleanupResponse, EvUpdatePoolInfo, + EvUpdateDatabaseInfo, + EvSubscribeOnPoolChanges, }; }; diff --git a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp index 8e112a3b0f36..d01b8ad45051 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp +++ b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp @@ -275,7 +275,7 @@ class TKqpCompileActor : public TActorBootstrapped { KqpHost = CreateKqpHost(Gateway, QueryId.Cluster, QueryId.Database, Config, ModuleResolverState->ModuleResolver, FederatedQuerySetup, UserToken, GUCSettings, QueryServiceConfig, ApplicationName, AppData(ctx)->FunctionRegistry, - false, false, std::move(TempTablesState), nullptr, SplitCtx); + false, false, std::move(TempTablesState), nullptr, SplitCtx, UserRequestContext); IKqpHost::TPrepareSettings prepareSettings; prepareSettings.DocumentApiRestricted = QueryId.Settings.DocumentApiRestricted; diff --git a/ydb/core/kqp/gateway/behaviour/resource_pool_classifier/object.cpp b/ydb/core/kqp/gateway/behaviour/resource_pool_classifier/object.cpp index 13ac10b1f26c..b34541962d70 100644 --- a/ydb/core/kqp/gateway/behaviour/resource_pool_classifier/object.cpp +++ b/ydb/core/kqp/gateway/behaviour/resource_pool_classifier/object.cpp @@ -123,7 +123,7 @@ NJson::TJsonValue TResourcePoolClassifierConfig::GetDebugJson() const { } bool TResourcePoolClassifierConfig::operator==(const TResourcePoolClassifierConfig& other) const { - return std::tie(Database, Name, Rank, ConfigJson) != std::tie(other.Database, other.Name, other.Rank, other.ConfigJson); + return std::tie(Database, Name, Rank, ConfigJson) == std::tie(other.Database, other.Name, other.Rank, other.ConfigJson); } NMetadata::IClassBehaviour::TPtr TResourcePoolClassifierConfig::GetBehaviour() { diff --git a/ydb/core/kqp/host/kqp_host.cpp b/ydb/core/kqp/host/kqp_host.cpp index 5f854fa7cef5..d5f43c408182 100644 --- a/ydb/core/kqp/host/kqp_host.cpp +++ b/ydb/core/kqp/host/kqp_host.cpp @@ -1033,7 +1033,8 @@ class TKqpHost : public IKqpHost { std::optional federatedQuerySetup, const TIntrusiveConstPtr& userToken, const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry, bool keepConfigChanges, bool isInternalCall, TKqpTempTablesState::TConstPtr tempTablesState = nullptr, NActors::TActorSystem* actorSystem = nullptr, - NYql::TExprContext* ctx = nullptr, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig = NKikimrConfig::TQueryServiceConfig()) + NYql::TExprContext* ctx = nullptr, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig = NKikimrConfig::TQueryServiceConfig(), + const TIntrusivePtr& userRequestContext = nullptr) : Gateway(gateway) , Cluster(cluster) , GUCSettings(gUCSettings) @@ -1044,7 +1045,7 @@ class TKqpHost : public IKqpHost { , KeepConfigChanges(keepConfigChanges) , IsInternalCall(isInternalCall) , FederatedQuerySetup(federatedQuerySetup) - , SessionCtx(new TKikimrSessionContext(funcRegistry, config, TAppData::TimeProvider, TAppData::RandomProvider, userToken)) + , SessionCtx(new TKikimrSessionContext(funcRegistry, config, TAppData::TimeProvider, TAppData::RandomProvider, userToken, nullptr, userRequestContext)) , Config(config) , TypesCtx(MakeIntrusive()) , PlanBuilder(CreatePlanBuilder(*TypesCtx)) @@ -1958,10 +1959,10 @@ TIntrusivePtr CreateKqpHost(TIntrusivePtr gateway, const const TString& database, TKikimrConfiguration::TPtr config, IModuleResolver::TPtr moduleResolver, std::optional federatedQuerySetup, const TIntrusiveConstPtr& userToken, const TGUCSettings::TPtr& gUCSettings, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, const TMaybe& applicationName, const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry, bool keepConfigChanges, - bool isInternalCall, TKqpTempTablesState::TConstPtr tempTablesState, NActors::TActorSystem* actorSystem, NYql::TExprContext* ctx) + bool isInternalCall, TKqpTempTablesState::TConstPtr tempTablesState, NActors::TActorSystem* actorSystem, NYql::TExprContext* ctx, const TIntrusivePtr& userRequestContext) { return MakeIntrusive(gateway, cluster, database, gUCSettings, applicationName, config, moduleResolver, federatedQuerySetup, userToken, funcRegistry, - keepConfigChanges, isInternalCall, std::move(tempTablesState), actorSystem, ctx, queryServiceConfig); + keepConfigChanges, isInternalCall, std::move(tempTablesState), actorSystem, ctx, queryServiceConfig, userRequestContext); } } // namespace NKqp diff --git a/ydb/core/kqp/host/kqp_host.h b/ydb/core/kqp/host/kqp_host.h index 85a7025a9e1b..5af2b9a03afc 100644 --- a/ydb/core/kqp/host/kqp_host.h +++ b/ydb/core/kqp/host/kqp_host.h @@ -123,7 +123,7 @@ TIntrusivePtr CreateKqpHost(TIntrusivePtr gateway, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, const TMaybe& applicationName = Nothing(), const NKikimr::NMiniKQL::IFunctionRegistry* funcRegistry = nullptr, bool keepConfigChanges = false, bool isInternalCall = false, TKqpTempTablesState::TConstPtr tempTablesState = nullptr, NActors::TActorSystem* actorSystem = nullptr /*take from TLS by default*/, - NYql::TExprContext* ctx = nullptr); + NYql::TExprContext* ctx = nullptr, const TIntrusivePtr& userRequestContext = nullptr); } // namespace NKqp } // namespace NKikimr diff --git a/ydb/core/kqp/host/kqp_runner.cpp b/ydb/core/kqp/host/kqp_runner.cpp index 3990f35e8be7..7db310659d97 100644 --- a/ydb/core/kqp/host/kqp_runner.cpp +++ b/ydb/core/kqp/host/kqp_runner.cpp @@ -146,7 +146,7 @@ class TKqpRunner : public IKqpRunner { , Config(sessionCtx->ConfigPtr()) , TransformCtx(transformCtx) , OptimizeCtx(MakeIntrusive(cluster, Config, sessionCtx->QueryPtr(), - sessionCtx->TablesPtr())) + sessionCtx->TablesPtr(), sessionCtx->GetUserRequestContext())) , BuildQueryCtx(MakeIntrusive()) , Pctx(TKqpProviderContext(*OptimizeCtx, Config->CostBasedOptimizationLevel.Get().GetOrElse(Config->DefaultCostBasedOptimizationLevel))) { diff --git a/ydb/core/kqp/opt/kqp_opt.h b/ydb/core/kqp/opt/kqp_opt.h index 30dda2a66aa7..9db41c2f79e8 100644 --- a/ydb/core/kqp/opt/kqp_opt.h +++ b/ydb/core/kqp/opt/kqp_opt.h @@ -10,11 +10,13 @@ namespace NKikimr::NKqp::NOpt { struct TKqpOptimizeContext : public TSimpleRefCount { TKqpOptimizeContext(const TString& cluster, const NYql::TKikimrConfiguration::TPtr& config, - const TIntrusivePtr queryCtx, const TIntrusivePtr& tables) + const TIntrusivePtr queryCtx, const TIntrusivePtr& tables, + const TIntrusivePtr& userRequestContext) : Cluster(cluster) , Config(config) , QueryCtx(queryCtx) , Tables(tables) + , UserRequestContext(userRequestContext) { YQL_ENSURE(QueryCtx); YQL_ENSURE(Tables); @@ -24,6 +26,7 @@ struct TKqpOptimizeContext : public TSimpleRefCount { const NYql::TKikimrConfiguration::TPtr Config; const TIntrusivePtr QueryCtx; const TIntrusivePtr Tables; + const TIntrusivePtr UserRequestContext; int JoinsCount{}; int EquiJoinsCount{}; std::shared_ptr OverrideStatistics{}; diff --git a/ydb/core/kqp/opt/kqp_query_plan.cpp b/ydb/core/kqp/opt/kqp_query_plan.cpp index 3fc634748ead..d418ebc7fc42 100644 --- a/ydb/core/kqp/opt/kqp_query_plan.cpp +++ b/ydb/core/kqp/opt/kqp_query_plan.cpp @@ -2384,11 +2384,21 @@ void PhyQuerySetTxPlans(NKqpProto::TKqpPhyQuery& queryProto, const TKqpPhysicalQ txPlans.emplace_back(phyTx.GetPlan()); } + TString queryStats = ""; + if (optCtx && optCtx->UserRequestContext && optCtx->UserRequestContext->PoolId) { + NJsonWriter::TBuf writer; + writer.BeginObject(); + writer.WriteKey("ResourcePoolId").WriteString(optCtx->UserRequestContext->PoolId); + writer.EndObject(); + + queryStats = writer.Str(); + } + NJsonWriter::TBuf writer; writer.SetIndentSpaces(2); WriteCommonTablesInfo(writer, serializerCtx.Tables); - queryProto.SetQueryPlan(SerializeTxPlans(txPlans, optCtx, writer.Str())); + queryProto.SetQueryPlan(SerializeTxPlans(txPlans, optCtx, writer.Str(), queryStats)); } void FillAggrStat(NJson::TJsonValue& node, const NYql::NDqProto::TDqStatsAggr& aggr, const TString& name) { @@ -2740,7 +2750,7 @@ TString AddExecStatsToTxPlan(const TString& txPlanJson, const NYql::NDqProto::TD return AddExecStatsToTxPlan(txPlanJson, stats, TIntrusivePtr()); } -TString SerializeAnalyzePlan(const NKqpProto::TKqpStatsQuery& queryStats) { +TString SerializeAnalyzePlan(const NKqpProto::TKqpStatsQuery& queryStats, const TString& poolId) { TVector txPlans; for (const auto& execStats: queryStats.GetExecutions()) { for (const auto& txPlan: execStats.GetTxPlansWithStats()) { @@ -2764,7 +2774,10 @@ TString SerializeAnalyzePlan(const NKqpProto::TKqpStatsQuery& queryStats) { writer.WriteKey("ProcessCpuTimeUs").WriteLongLong(queryStats.GetWorkerCpuTimeUs()); writer.WriteKey("TotalDurationUs").WriteLongLong(queryStats.GetDurationUs()); - writer.WriteKey("QueuedTimeUs").WriteLongLong(queryStats.GetQueuedTimeUs()); + if (poolId) { + writer.WriteKey("QueuedTimeUs").WriteLongLong(queryStats.GetQueuedTimeUs()); + writer.WriteKey("ResourcePoolId").WriteString(poolId); + } writer.EndObject(); return SerializeTxPlans(txPlans, TIntrusivePtr(), "", writer.Str()); diff --git a/ydb/core/kqp/opt/kqp_query_plan.h b/ydb/core/kqp/opt/kqp_query_plan.h index 8adbf2b20866..7f720dc4c6ca 100644 --- a/ydb/core/kqp/opt/kqp_query_plan.h +++ b/ydb/core/kqp/opt/kqp_query_plan.h @@ -44,7 +44,7 @@ void PhyQuerySetTxPlans(NKqpProto::TKqpPhyQuery& queryProto, const NYql::NNodes: */ TString AddExecStatsToTxPlan(const TString& txPlan, const NYql::NDqProto::TDqExecutionStats& stats); -TString SerializeAnalyzePlan(const NKqpProto::TKqpStatsQuery& queryStats); +TString SerializeAnalyzePlan(const NKqpProto::TKqpStatsQuery& queryStats, const TString& poolId = ""); TString SerializeScriptPlan(const TVector& queryPlans); diff --git a/ydb/core/kqp/provider/yql_kikimr_provider.h b/ydb/core/kqp/provider/yql_kikimr_provider.h index 0c5987fce759..dd39b33a77d9 100644 --- a/ydb/core/kqp/provider/yql_kikimr_provider.h +++ b/ydb/core/kqp/provider/yql_kikimr_provider.h @@ -5,6 +5,7 @@ #include #include +#include #include #include #include @@ -443,12 +444,14 @@ class TKikimrSessionContext : public TThrRefBase { TIntrusivePtr timeProvider, TIntrusivePtr randomProvider, const TIntrusiveConstPtr& userToken, - TIntrusivePtr txCtx = nullptr) + TIntrusivePtr txCtx = nullptr, + const TIntrusivePtr& userRequestContext = nullptr) : Configuration(config) , TablesData(MakeIntrusive()) , QueryCtx(MakeIntrusive(functionRegistry, timeProvider, randomProvider)) , TxCtx(txCtx) , UserToken(userToken) + , UserRequestContext(userRequestContext) {} TKikimrSessionContext(const TKikimrSessionContext&) = delete; @@ -530,6 +533,10 @@ class TKikimrSessionContext : public TThrRefBase { return UserToken; } + const TIntrusivePtr& GetUserRequestContext() const { + return UserRequestContext; + } + private: TString UserName; TString Cluster; @@ -541,6 +548,7 @@ class TKikimrSessionContext : public TThrRefBase { TIntrusivePtr TxCtx; NKikimr::NKqp::TKqpTempTablesState::TConstPtr TempTablesState; TIntrusiveConstPtr UserToken; + TIntrusivePtr UserRequestContext; }; TIntrusivePtr CreateKikimrDataSource( diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp index 58908e23ed55..2285b9ef537e 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp @@ -234,6 +234,7 @@ class TKqpProxyService : public TActorBootstrapped { IEventHandle::FlagTrackDelivery); WhiteBoardService = NNodeWhiteboard::MakeNodeWhiteboardServiceId(SelfId().NodeId()); + ResourcePoolsCache.UpdateFeatureFlags(FeatureFlags, ActorContext()); if (auto& cfg = TableServiceConfig.GetSpillingServiceConfig().GetLocalFileConfig(); cfg.GetEnable()) { TString spillingRoot = cfg.GetRoot(); @@ -482,6 +483,8 @@ class TKqpProxyService : public TActorBootstrapped { Send(TActivationContext::InterconnectProxy(node), new TEvents::TEvUnsubscribe); }); + ResourcePoolsCache.UnsubscribeFromResourcePoolClassifiers(ActorContext()); + return TActor::PassAway(); } @@ -499,6 +502,7 @@ class TKqpProxyService : public TActorBootstrapped { UpdateYqlLogLevels(); FeatureFlags.Swap(event.MutableConfig()->MutableFeatureFlags()); + ResourcePoolsCache.UpdateFeatureFlags(FeatureFlags, ActorContext()); auto responseEv = MakeHolder(event); Send(ev->Sender, responseEv.Release(), IEventHandle::FlagTrackDelivery, ev->Cookie); @@ -1353,6 +1357,8 @@ class TKqpProxyService : public TActorBootstrapped { hFunc(TEvKqp::TEvListSessionsRequest, Handle); hFunc(TEvKqp::TEvListProxyNodesRequest, Handle); hFunc(NWorkload::TEvUpdatePoolInfo, Handle); + hFunc(NWorkload::TEvUpdateDatabaseInfo, Handle); + hFunc(NMetadata::NProvider::TEvRefreshSubscriberData, Handle); default: Y_ABORT("TKqpProxyService: unexpected event type: %" PRIx32 " event: %s", ev->GetTypeRewrite(), ev->ToString().data()); @@ -1561,23 +1567,26 @@ class TKqpProxyService : public TActorBootstrapped { } bool TryFillPoolInfoFromCache(TEvKqp::TEvQueryRequest::TPtr& ev, ui64 requestId) { - if (!FeatureFlags.GetEnableResourcePools()) { + ResourcePoolsCache.UpdateFeatureFlags(FeatureFlags, ActorContext()); + + const auto& database = ev->Get()->GetDatabase(); + if (!ResourcePoolsCache.ResourcePoolsEnabled(database)) { ev->Get()->SetPoolId(""); return true; } + const auto& userToken = ev->Get()->GetUserToken(); if (!ev->Get()->GetPoolId()) { - ev->Get()->SetPoolId(NResourcePool::DEFAULT_POOL_ID); + ev->Get()->SetPoolId(ResourcePoolsCache.GetPoolId(database, userToken, ActorContext())); } const auto& poolId = ev->Get()->GetPoolId(); - const auto& poolInfo = ResourcePoolsCache.GetPoolInfo(ev->Get()->GetDatabase(), poolId); + const auto& poolInfo = ResourcePoolsCache.GetPoolInfo(database, poolId, ActorContext()); if (!poolInfo) { return true; } const auto& securityObject = poolInfo->SecurityObject; - const auto& userToken = ev->Get()->GetUserToken(); if (securityObject && userToken && !userToken->GetSerializedToken().empty()) { if (!securityObject->CheckAccess(NACLib::EAccessRights::DescribeSchema, *userToken)) { ReplyProcessError(Ydb::StatusIds::NOT_FOUND, TStringBuilder() << "Resource pool " << poolId << " not found or you don't have access permissions", requestId); @@ -1783,7 +1792,15 @@ class TKqpProxyService : public TActorBootstrapped { } void Handle(NWorkload::TEvUpdatePoolInfo::TPtr& ev) { - ResourcePoolsCache.UpdatePoolInfo(ev->Get()->Database, ev->Get()->PoolId, ev->Get()->Config, ev->Get()->SecurityObject); + ResourcePoolsCache.UpdatePoolInfo(ev->Get()->Database, ev->Get()->PoolId, ev->Get()->Config, ev->Get()->SecurityObject, ActorContext()); + } + + void Handle(NWorkload::TEvUpdateDatabaseInfo::TPtr& ev) { + ResourcePoolsCache.UpdateDatabaseInfo(ev->Get()->Database, ev->Get()->Serverless); + } + + void Handle(NMetadata::NProvider::TEvRefreshSubscriberData::TPtr& ev) { + ResourcePoolsCache.UpdateResourcePoolClassifiersInfo(ev->Get()->GetSnapshotAs(), ActorContext()); } private: diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h b/ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h index e5023b09ed46..04f1052fa3fa 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service_impl.h @@ -3,7 +3,9 @@ #include #include #include +#include #include +#include #include #include @@ -417,39 +419,223 @@ class TLocalSessionsRegistry { }; class TResourcePoolsCache { + struct TClassifierInfo { + const TString Membername; + const TString PoolId; + + TClassifierInfo(const NResourcePool::TClassifierSettings& classifierSettings) + : Membername(classifierSettings.Membername) + , PoolId(classifierSettings.ResourcePool) + {} + }; + + struct TDatabaseInfo { + std::unordered_map ResourcePoolsClassifiers = {}; + std::map RankToClassifierInfo = {}; + std::unordered_map UserToResourcePool = {}; + bool Serverless = false; + }; + struct TPoolInfo { NResourcePool::TPoolSettings Config; std::optional SecurityObject; + bool Expired = false; }; public: - std::optional GetPoolInfo(const TString& database, const TString& poolId) const { + bool ResourcePoolsEnabled(const TString& database) const { + if (!EnableResourcePools) { + return false; + } + + if (EnableResourcePoolsOnServerless) { + return true; + } + + const auto databaseInfo = GetDatabaseInfo(database); + return !databaseInfo || !databaseInfo->Serverless; + } + + TString GetPoolId(const TString& database, const TIntrusiveConstPtr& userToken, TActorContext actorContext) { + if (!userToken || userToken->GetUserSID().empty()) { + return NResourcePool::DEFAULT_POOL_ID; + } + + TDatabaseInfo& databaseInfo = *GetOrCreateDatabaseInfo(database); + if (const auto& poolId = GetPoolIdFromClassifiers(database, userToken->GetUserSID(), databaseInfo, userToken, actorContext)) { + return poolId; + } + for (const auto& userSID : userToken->GetGroupSIDs()) { + if (const auto& poolId = GetPoolIdFromClassifiers(database, userSID, databaseInfo, userToken, actorContext)) { + return poolId; + } + } + + return NResourcePool::DEFAULT_POOL_ID; + } + + std::optional GetPoolInfo(const TString& database, const TString& poolId, TActorContext actorContext) const { auto it = PoolsCache.find(GetPoolKey(database, poolId)); if (it == PoolsCache.end()) { + actorContext.Send(MakeKqpWorkloadServiceId(actorContext.SelfID.NodeId()), new NWorkload::TEvSubscribeOnPoolChanges(database, poolId)); return std::nullopt; } return it->second; } - void UpdatePoolInfo(const TString& database, const TString& poolId, const std::optional& config, const std::optional& securityObject) { + void UpdateFeatureFlags(const NKikimrConfig::TFeatureFlags& featureFlags, TActorContext actorContext) { + EnableResourcePools = featureFlags.GetEnableResourcePools(); + EnableResourcePoolsOnServerless = featureFlags.GetEnableResourcePoolsOnServerless(); + UpdateResourcePoolClassifiersSubscription(actorContext); + } + + void UpdateDatabaseInfo(const TString& database, bool serverless) { + GetOrCreateDatabaseInfo(database)->Serverless = serverless; + } + + void UpdatePoolInfo(const TString& database, const TString& poolId, const std::optional& config, const std::optional& securityObject, TActorContext actorContext) { + bool clearClassifierCache = false; + const TString& poolKey = GetPoolKey(database, poolId); if (!config) { - PoolsCache.erase(poolKey); - return; + auto it = PoolsCache.find(poolKey); + if (it == PoolsCache.end()) { + return; + } + if (it->second.Expired) { + // Pool was dropped + clearClassifierCache = true; + PoolsCache.erase(it); + } else { + // Refresh pool subscription + it->second.Expired = true; + actorContext.Send(MakeKqpWorkloadServiceId(actorContext.SelfID.NodeId()), new NWorkload::TEvSubscribeOnPoolChanges(database, poolId)); + } + } else { + auto& poolInfo = PoolsCache[poolKey]; + clearClassifierCache = poolInfo.SecurityObject != securityObject; + poolInfo.Config = *config; + poolInfo.SecurityObject = securityObject; + poolInfo.Expired = false; + } + + if (clearClassifierCache) { + GetOrCreateDatabaseInfo(database)->UserToResourcePool.clear(); } + } - auto& poolInfo = PoolsCache[poolKey]; - poolInfo.Config = *config; - poolInfo.SecurityObject = securityObject; + void UpdateResourcePoolClassifiersInfo(const TResourcePoolClassifierSnapshot* snapsot, TActorContext actorContext) { + auto resourcePoolClassifierConfigs = snapsot->GetResourcePoolClassifierConfigs(); + for (auto& [database, databaseInfo] : DatabasesCache) { + auto it = resourcePoolClassifierConfigs.find(database); + if (it != resourcePoolClassifierConfigs.end()) { + UpdateDatabaseResourcePoolClassifiers(database, databaseInfo, std::move(it->second), actorContext); + resourcePoolClassifierConfigs.erase(it); + } else if (!databaseInfo.ResourcePoolsClassifiers.empty()) { + databaseInfo.ResourcePoolsClassifiers.clear(); + databaseInfo.RankToClassifierInfo.clear(); + databaseInfo.UserToResourcePool.clear(); + } + } + for (auto& [database, configsMap] : resourcePoolClassifierConfigs) { + UpdateDatabaseResourcePoolClassifiers(database, *GetOrCreateDatabaseInfo(database), std::move(configsMap), actorContext); + } + } + + void UnsubscribeFromResourcePoolClassifiers(TActorContext actorContext) { + if (SubscribedOnResourcePoolClassifiers) { + SubscribedOnResourcePoolClassifiers = false; + actorContext.Send(NMetadata::NProvider::MakeServiceId(actorContext.SelfID.NodeId()), new NMetadata::NProvider::TEvUnsubscribeExternal(std::make_shared())); + } } private: + void UpdateResourcePoolClassifiersSubscription(TActorContext actorContext) { + if (EnableResourcePools) { + SubscribeOnResourcePoolClassifiers(actorContext); + } else { + UnsubscribeFromResourcePoolClassifiers(actorContext); + } + } + + void SubscribeOnResourcePoolClassifiers(TActorContext actorContext) { + if (!SubscribedOnResourcePoolClassifiers && NMetadata::NProvider::TServiceOperator::IsEnabled()) { + SubscribedOnResourcePoolClassifiers = true; + actorContext.Send(NMetadata::NProvider::MakeServiceId(actorContext.SelfID.NodeId()), new NMetadata::NProvider::TEvSubscribeExternal(std::make_shared())); + } + } + + void UpdateDatabaseResourcePoolClassifiers(const TString& database, TDatabaseInfo& databaseInfo, std::unordered_map&& configsMap, TActorContext actorContext) { + if (databaseInfo.ResourcePoolsClassifiers == configsMap) { + return; + } + + databaseInfo.ResourcePoolsClassifiers.swap(configsMap); + databaseInfo.UserToResourcePool.clear(); + databaseInfo.RankToClassifierInfo.clear(); + for (const auto& [_, classifier] : databaseInfo.ResourcePoolsClassifiers) { + const auto& classifierSettings = classifier.GetClassifierSettings(); + databaseInfo.RankToClassifierInfo.insert({classifier.GetRank(), TClassifierInfo(classifierSettings)}); + if (!PoolsCache.contains(classifierSettings.ResourcePool)) { + actorContext.Send(MakeKqpWorkloadServiceId(actorContext.SelfID.NodeId()), new NWorkload::TEvSubscribeOnPoolChanges(database, classifierSettings.ResourcePool)); + } + } + } + + TString GetPoolIdFromClassifiers(const TString& database, const TString& userSID, TDatabaseInfo& databaseInfo, const TIntrusiveConstPtr& userToken, TActorContext actorContext) const { + auto& usersMap = databaseInfo.UserToResourcePool; + if (const auto it = usersMap.find(userSID); it != usersMap.end()) { + return it->second; + } + + TString poolId = ""; + for (const auto& [_, classifier] : databaseInfo.RankToClassifierInfo) { + if (classifier.Membername != userSID) { + continue; + } + + auto it = PoolsCache.find(GetPoolKey(database, classifier.PoolId)); + if (it == PoolsCache.end()) { + actorContext.Send(MakeKqpWorkloadServiceId(actorContext.SelfID.NodeId()), new NWorkload::TEvSubscribeOnPoolChanges(database, classifier.PoolId)); + continue; + } + + if (userToken && !userToken->GetSerializedToken().empty() && !it->second.SecurityObject->CheckAccess(NACLib::DescribeSchema | NACLib::SelectRow, *userToken)) { + continue; + } + + poolId = classifier.PoolId; + break; + } + + usersMap[userSID] = poolId; + return poolId; + } + + TDatabaseInfo* GetOrCreateDatabaseInfo(const TString& database) { + const TString& path = CanonizePath(database); + if (const auto it = DatabasesCache.find(path); it != DatabasesCache.end()) { + return &it->second; + } + return &DatabasesCache.insert({path, TDatabaseInfo{}}).first->second; + } + + const TDatabaseInfo* GetDatabaseInfo(const TString& database) const { + const auto it = DatabasesCache.find(CanonizePath(database)); + return it != DatabasesCache.end() ? &it->second : nullptr; + } + static TString GetPoolKey(const TString& database, const TString& poolId) { return CanonizePath(TStringBuilder() << database << "/" << poolId); } private: std::unordered_map PoolsCache; + std::unordered_map DatabasesCache; + + bool EnableResourcePools = false; + bool EnableResourcePoolsOnServerless = false; + bool SubscribedOnResourcePoolClassifiers = false; }; } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/proxy_service/ya.make b/ydb/core/kqp/proxy_service/ya.make index a69e5c7ca7b2..e1c2b9e1b76a 100644 --- a/ydb/core/kqp/proxy_service/ya.make +++ b/ydb/core/kqp/proxy_service/ya.make @@ -17,6 +17,7 @@ PEERDIR( ydb/core/kqp/common ydb/core/kqp/common/events ydb/core/kqp/counters + ydb/core/kqp/gateway/behaviour/resource_pool_classifier ydb/core/kqp/proxy_service/proto ydb/core/kqp/run_script_actor ydb/core/kqp/workload_service diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index 65bd3b1b5c45..b5ac2f30b6eb 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -1357,7 +1357,7 @@ class TKqpSessionActor : public TActorBootstrapped { executionStats.Swap(&stats); stats = QueryState->QueryStats.ToProto(); stats.MutableExecutions()->MergeFrom(executionStats.GetExecutions()); - ev->Get()->Record.SetQueryPlan(SerializeAnalyzePlan(stats)); + ev->Get()->Record.SetQueryPlan(SerializeAnalyzePlan(stats, QueryState->UserRequestContext->PoolId)); } } @@ -1599,7 +1599,7 @@ class TKqpSessionActor : public TActorBootstrapped { if (QueryState->ReportStats()) { auto stats = QueryState->QueryStats.ToProto(); if (QueryState->GetStatsMode() >= Ydb::Table::QueryStatsCollection::STATS_COLLECTION_FULL) { - response->SetQueryPlan(SerializeAnalyzePlan(stats)); + response->SetQueryPlan(SerializeAnalyzePlan(stats, QueryState->UserRequestContext->PoolId)); response->SetQueryAst(QueryState->CompileResult->PreparedQuery->GetPhysicalQuery().GetQueryAst()); } response->MutableQueryStats()->Swap(&stats); diff --git a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp index 1c10f842f188..6d3f0b819dc7 100644 --- a/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp +++ b/ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp @@ -268,6 +268,47 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { } } + Y_UNIT_TEST(ExecuteQueryWithResourcePoolClassifier) { + NKikimrConfig::TAppConfig config; + config.MutableFeatureFlags()->SetEnableResourcePools(true); + + auto kikimr = TKikimrRunner(TKikimrSettings() + .SetAppConfig(config) + .SetEnableResourcePools(true)); + auto db = kikimr.GetQueryClient(); + + const TString userSID = TStringBuilder() << "test@" << BUILTIN_ACL_DOMAIN; + const TString schemeSql = TStringBuilder() << R"( + CREATE RESOURCE POOL MyPool WITH ( + CONCURRENT_QUERY_LIMIT=0 + ); + CREATE RESOURCE POOL CLASSIFIER MyPoolClassifier WITH ( + RESOURCE_POOL="MyPool", + MEMBERNAME=")" << userSID << R"(" + ); + GRANT ALL ON `/Root` TO `)" << userSID << R"(`; + )"; + auto schemeResult = db.ExecuteQuery(schemeSql, TTxControl::NoTx()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(schemeResult.GetStatus(), EStatus::SUCCESS, schemeResult.GetIssues().ToString()); + + auto testUserClient = kikimr.GetQueryClient(TClientSettings().AuthToken(userSID)); + const TDuration timeout = TDuration::Seconds(5); + const TInstant start = TInstant::Now(); + while (TInstant::Now() - start <= timeout) { + const TString query = "SELECT 42;"; + auto result = testUserClient.ExecuteQuery(query, TTxControl::BeginTx().CommitTx()).ExtractValueSync(); + if (!result.IsSuccess()) { + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::PRECONDITION_FAILED, result.GetIssues().ToString()); + UNIT_ASSERT_STRING_CONTAINS(result.GetIssues().ToString(), "Resource pool MyPool was disabled due to zero concurrent query limit"); + return; + } + + Cerr << "Wait resource pool classifier " << TInstant::Now() - start << ": status = " << result.GetStatus() << ", issues = " << result.GetIssues().ToOneLineString() << "\n"; + Sleep(TDuration::Seconds(1)); + } + UNIT_ASSERT_C(false, "Waiting resource pool classifier timeout. Spent time " << TInstant::Now() - start << " exceeds limit " << timeout); + } + std::pair CalcRowsAndBatches(TExecuteQueryIterator& it) { ui32 totalRows = 0; ui32 totalBatches = 0; diff --git a/ydb/core/kqp/workload_service/actors/pool_handlers_acors.cpp b/ydb/core/kqp/workload_service/actors/pool_handlers_acors.cpp index 921944a0db41..71b9e1ae39d7 100644 --- a/ydb/core/kqp/workload_service/actors/pool_handlers_acors.cpp +++ b/ydb/core/kqp/workload_service/actors/pool_handlers_acors.cpp @@ -138,6 +138,7 @@ class TPoolHandlerActorBase : public TActor { sFunc(TEvents::TEvPoison, HandlePoison); sFunc(TEvPrivate::TEvStopPoolHandler, HandleStop); hFunc(TEvPrivate::TEvResolvePoolResponse, Handle); + hFunc(TEvPrivate::TEvUpdatePoolSubscription, Handle); // Pool handler events hFunc(TEvPrivate::TEvCancelRequest, Handle); @@ -157,7 +158,7 @@ class TPoolHandlerActorBase : public TActor { this->Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvWatchRemove(0)); } - SendPoolInfoUpdate(std::nullopt, std::nullopt); + SendPoolInfoUpdate(std::nullopt, std::nullopt, Subscribers); Counters.OnCleanup(); @@ -180,6 +181,8 @@ class TPoolHandlerActorBase : public TActor { } void Handle(TEvPrivate::TEvResolvePoolResponse::TPtr& ev) { + this->Send(MakeKqpWorkloadServiceId(this->SelfId().NodeId()), new TEvPrivate::TEvPlaceRequestIntoPoolResponse(Database, PoolId)); + auto event = std::move(ev->Get()->Event); const TActorId& workerActorId = event->Sender; if (!InFlightLimit) { @@ -204,8 +207,6 @@ class TPoolHandlerActorBase : public TActor { UpdatePoolConfig(ev->Get()->PoolConfig); UpdateSchemeboardSubscription(ev->Get()->PathId); OnScheduleRequest(request); - - this->Send(MakeKqpWorkloadServiceId(this->SelfId().NodeId()), new TEvPrivate::TEvPlaceRequestIntoPoolResponse(Database, PoolId)); } void Handle(TEvCleanupRequest::TPtr& ev) { @@ -241,6 +242,14 @@ class TPoolHandlerActorBase : public TActor { OnCleanupRequest(request); } + void Handle(TEvPrivate::TEvUpdatePoolSubscription::TPtr& ev) { + const auto& newSubscribers = ev->Get()->Subscribers; + if (!UpdateSchemeboardSubscription(ev->Get()->PathId)) { + SendPoolInfoUpdate(PoolConfig, SecurityObject, newSubscribers); + } + Subscribers.insert(newSubscribers.begin(), newSubscribers.end()); + } + void Handle(TEvTxProxySchemeCache::TEvWatchNotifyUpdated::TPtr& ev) { if (ev->Get()->Key != WatchKey) { // Skip old paths watch notifications @@ -265,12 +274,11 @@ class TPoolHandlerActorBase : public TActor { UpdatePoolConfig(poolConfig); const auto& pathDescription = result->GetPathDescription().GetSelf(); - NACLib::TSecurityObject object(pathDescription.GetOwner(), false); - if (object.MutableACL()->ParseFromString(pathDescription.GetEffectiveACL())) { - SendPoolInfoUpdate(poolConfig, object); - } else { - SendPoolInfoUpdate(poolConfig, std::nullopt); + SecurityObject = NACLib::TSecurityObject(pathDescription.GetOwner(), false); + if (!SecurityObject->MutableACL()->ParseFromString(pathDescription.GetEffectiveACL())) { + SecurityObject = std::nullopt; } + SendPoolInfoUpdate(poolConfig, SecurityObject, Subscribers); } void Handle(TEvTxProxySchemeCache::TEvWatchNotifyDeleted::TPtr& ev) { @@ -280,7 +288,7 @@ class TPoolHandlerActorBase : public TActor { } LOG_D("Got delete notification"); - SendPoolInfoUpdate(std::nullopt, std::nullopt); + SendPoolInfoUpdate(std::nullopt, std::nullopt, Subscribers); } public: @@ -346,8 +354,10 @@ class TPoolHandlerActorBase : public TActor { RemoveRequest(request); } - void SendPoolInfoUpdate(const std::optional& config, const std::optional& securityObject) const { - this->Send(MakeKqpProxyID(this->SelfId().NodeId()), new TEvUpdatePoolInfo(Database, PoolId, config, securityObject)); + void SendPoolInfoUpdate(const std::optional& config, const std::optional& securityObject, const std::unordered_set& subscribers) const { + for (const auto& subscriber : subscribers) { + this->Send(subscriber, new TEvUpdatePoolInfo(Database, PoolId, config, securityObject)); + } } protected: @@ -437,9 +447,9 @@ class TPoolHandlerActorBase : public TActor { LOG_I("Cancel request for worker " << request->WorkerActorId << ", session id: " << request->SessionId << ", local in flight: " << LocalInFlight); } - void UpdateSchemeboardSubscription(TPathId pathId) { + bool UpdateSchemeboardSubscription(TPathId pathId) { if (WatchPathId && *WatchPathId == pathId) { - return; + return false; } if (WatchPathId) { @@ -452,6 +462,7 @@ class TPoolHandlerActorBase : public TActor { LOG_D("Subscribed on schemeboard notifications for path: " << pathId.ToString()); WatchPathId = std::make_unique(pathId); this->Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvWatchPathId(*WatchPathId, WatchKey)); + return true; } void UpdatePoolConfig(const NResourcePool::TPoolSettings& poolConfig) { @@ -493,10 +504,12 @@ class TPoolHandlerActorBase : public TActor { private: NResourcePool::TPoolSettings PoolConfig; + std::optional SecurityObject; // Scheme board settings std::unique_ptr WatchPathId; ui64 WatchKey = 0; + std::unordered_set Subscribers; // Pool state ui64 LocalInFlight = 0; diff --git a/ydb/core/kqp/workload_service/actors/scheme_actors.cpp b/ydb/core/kqp/workload_service/actors/scheme_actors.cpp index 5cdc839a1cca..5a1e264b50f4 100644 --- a/ydb/core/kqp/workload_service/actors/scheme_actors.cpp +++ b/ydb/core/kqp/workload_service/actors/scheme_actors.cpp @@ -221,7 +221,7 @@ class TPoolFetcherActor : public TSchemeActorBase { } Issues.AddIssues(std::move(issues)); - Send(ReplyActorId, new TEvPrivate::TEvFetchPoolResponse(status, PoolConfig, PathIdFromPathId(PathId), std::move(Issues))); + Send(ReplyActorId, new TEvPrivate::TEvFetchPoolResponse(status, Database, PoolId, PoolConfig, PathIdFromPathId(PathId), std::move(Issues))); PassAway(); } diff --git a/ydb/core/kqp/workload_service/common/events.h b/ydb/core/kqp/workload_service/common/events.h index 0f29d59e9fb0..3d95a655785d 100644 --- a/ydb/core/kqp/workload_service/common/events.h +++ b/ydb/core/kqp/workload_service/common/events.h @@ -30,6 +30,7 @@ struct TEvPrivate { EvResignPoolHandler, EvStopPoolHandler, EvCancelRequest, + EvUpdatePoolSubscription, EvCpuQuotaRequest, EvCpuQuotaResponse, @@ -75,14 +76,18 @@ struct TEvPrivate { }; struct TEvFetchPoolResponse : public NActors::TEventLocal { - TEvFetchPoolResponse(Ydb::StatusIds::StatusCode status, const NResourcePool::TPoolSettings& poolConfig, TPathId pathId, NYql::TIssues issues) + TEvFetchPoolResponse(Ydb::StatusIds::StatusCode status, const TString& database, const TString& poolId, const NResourcePool::TPoolSettings& poolConfig, TPathId pathId, NYql::TIssues issues) : Status(status) + , Database(database) + , PoolId(poolId) , PoolConfig(poolConfig) , PathId(pathId) , Issues(std::move(issues)) {} const Ydb::StatusIds::StatusCode Status; + const TString Database; + const TString PoolId; const NResourcePool::TPoolSettings PoolConfig; const TPathId PathId; const NYql::TIssues Issues; @@ -171,6 +176,16 @@ struct TEvPrivate { const TString SessionId; }; + struct TEvUpdatePoolSubscription : public NActors::TEventLocal { + explicit TEvUpdatePoolSubscription(TPathId pathId, const std::unordered_set& subscribers) + : PathId(pathId) + , Subscribers(subscribers) + {} + + const TPathId PathId; + const std::unordered_set Subscribers; + }; + // Cpu load requests struct TEvCpuQuotaRequest : public NActors::TEventLocal { explicit TEvCpuQuotaRequest(double maxClusterLoad) diff --git a/ydb/core/kqp/workload_service/kqp_workload_service.cpp b/ydb/core/kqp/workload_service/kqp_workload_service.cpp index 295e536d2499..3118f62f651d 100644 --- a/ydb/core/kqp/workload_service/kqp_workload_service.cpp +++ b/ydb/core/kqp/workload_service/kqp_workload_service.cpp @@ -140,6 +140,18 @@ class TKqpWorkloadService : public TActorBootstrapped { } } + void Handle(TEvSubscribeOnPoolChanges::TPtr& ev) { + const TString& database = ev->Get()->Database; + const TString& poolId = ev->Get()->PoolId; + if (!EnabledResourcePools) { + Send(ev->Sender, new TEvUpdatePoolInfo(database, poolId, std::nullopt, std::nullopt)); + return; + } + + LOG_D("Recieved subscription request, Database: " << database << ", PoolId: " << poolId); + GetOrCreateDatabaseState(database)->DoSubscribeRequest(std::move(ev)); + } + void Handle(TEvPlaceRequestIntoPool::TPtr& ev) { const TActorId& workerActorId = ev->Sender; if (!EnabledResourcePools) { @@ -188,11 +200,13 @@ class TKqpWorkloadService : public TActorBootstrapped { hFunc(TEvInterconnect::TEvNodesInfo, Handle); hFunc(TEvents::TEvUndelivered, Handle); + hFunc(TEvSubscribeOnPoolChanges, Handle); hFunc(TEvPlaceRequestIntoPool, Handle); hFunc(TEvCleanupRequest, Handle); hFunc(TEvents::TEvWakeup, Handle); hFunc(TEvPrivate::TEvFetchDatabaseResponse, Handle); + hFunc(TEvPrivate::TEvFetchPoolResponse, Handle); hFunc(TEvPrivate::TEvResolvePoolResponse, Handle); hFunc(TEvPrivate::TEvPlaceRequestIntoPoolResponse, Handle); hFunc(TEvPrivate::TEvNodesInfoRequest, Handle); @@ -211,6 +225,21 @@ class TKqpWorkloadService : public TActorBootstrapped { GetOrCreateDatabaseState(ev->Get()->Database)->UpdateDatabaseInfo(ev); } + void Handle(TEvPrivate::TEvFetchPoolResponse::TPtr& ev) { + const TString& database = ev->Get()->Database; + const TString& poolId = ev->Get()->PoolId; + + TActorId poolHandler; + if (ev->Get()->Status == Ydb::StatusIds::SUCCESS) { + LOG_D("Successfully fetched pool " << poolId << ", Database: " << database); + poolHandler = GetOrCreatePoolState(database, poolId, ev->Get()->PoolConfig)->PoolHandler; + } else { + LOG_W("Failed to fetch pool " << poolId << ", Database: " << database << ", status: " << ev->Get()->Status << ", issues: " << ev->Get()->Issues.ToOneLineString()); + } + + GetOrCreateDatabaseState(database)->UpdatePoolInfo(ev, poolHandler); + } + void Handle(TEvPrivate::TEvResolvePoolResponse::TPtr& ev) { const auto& event = ev->Get()->Event; const TString& database = event->Get()->Database; @@ -226,18 +255,7 @@ class TKqpWorkloadService : public TActorBootstrapped { LOG_D("Successfully fetched pool " << poolId << ", Database: " << database << ", SessionId: " << event->Get()->SessionId); - auto poolState = GetPoolState(database, poolId); - if (!poolState) { - TString poolKey = GetPoolKey(database, poolId); - LOG_I("Creating new handler for pool " << poolKey); - - auto poolHandler = Register(CreatePoolHandlerActor(database, poolId, ev->Get()->PoolConfig, Counters.Counters)); - poolState = &PoolIdToState.insert({poolKey, TPoolState{.PoolHandler = poolHandler, .ActorContext = ActorContext()}}).first->second; - - Counters.ActivePools->Inc(); - ScheduleIdleCheck(); - } - + auto poolState = GetOrCreatePoolState(database, poolId, ev->Get()->PoolConfig); poolState->PendingRequests.emplace(std::move(ev)); poolState->StartPlaceRequest(); } @@ -500,6 +518,23 @@ class TKqpWorkloadService : public TActorBootstrapped { return &DatabaseToState.insert({database, TDatabaseState{.ActorContext = ActorContext(), .EnabledResourcePoolsOnServerless = EnabledResourcePoolsOnServerless}}).first->second; } + TPoolState* GetOrCreatePoolState(const TString& database, const TString& poolId, const NResourcePool::TPoolSettings& poolConfig) { + const auto& poolKey = GetPoolKey(database, poolId); + if (auto poolState = GetPoolState(poolKey)) { + return poolState; + } + + LOG_I("Creating new handler for pool " << poolKey); + + const auto poolHandler = Register(CreatePoolHandlerActor(database, poolId, poolConfig, Counters.Counters)); + const auto poolState = &PoolIdToState.insert({poolKey, TPoolState{.PoolHandler = poolHandler, .ActorContext = ActorContext()}}).first->second; + + Counters.ActivePools->Inc(); + ScheduleIdleCheck(); + + return poolState; + } + TPoolState* GetPoolState(const TString& database, const TString& poolId) { return GetPoolState(GetPoolKey(database, poolId)); } diff --git a/ydb/core/kqp/workload_service/kqp_workload_service_impl.h b/ydb/core/kqp/workload_service/kqp_workload_service_impl.h index 8503a4fb7949..977c7ef3034a 100644 --- a/ydb/core/kqp/workload_service/kqp_workload_service_impl.h +++ b/ydb/core/kqp/workload_service/kqp_workload_service_impl.h @@ -2,6 +2,7 @@ #include +#include #include #include #include @@ -18,11 +19,22 @@ struct TDatabaseState { bool& EnabledResourcePoolsOnServerless; std::vector PendingRequersts = {}; + std::unordered_map> PendingSubscriptions = {}; bool HasDefaultPool = false; bool Serverless = false; TInstant LastUpdateTime = TInstant::Zero(); + void DoSubscribeRequest(TEvSubscribeOnPoolChanges::TPtr ev) { + const TString& poolId = ev->Get()->PoolId; + auto& subscribers = PendingSubscriptions[poolId]; + if (subscribers.empty()) { + ActorContext.Register(CreatePoolFetcherActor(ActorContext.SelfID, ev->Get()->Database, poolId, nullptr)); + } + + subscribers.emplace(ev->Sender); + } + void DoPlaceRequest(TEvPlaceRequestIntoPool::TPtr ev) { TString database = ev->Get()->Database; PendingRequersts.emplace_back(std::move(ev)); @@ -34,12 +46,34 @@ struct TDatabaseState { } } + void UpdatePoolInfo(const TEvPrivate::TEvFetchPoolResponse::TPtr& ev, NActors::TActorId poolHandler) { + const TString& poolId = ev->Get()->PoolId; + auto& subscribers = PendingSubscriptions[poolId]; + if (subscribers.empty()) { + return; + } + + if (ev->Get()->Status == Ydb::StatusIds::SUCCESS && poolHandler) { + ActorContext.Send(poolHandler, new TEvPrivate::TEvUpdatePoolSubscription(ev->Get()->PathId, subscribers)); + } else { + const TString& database = ev->Get()->Database; + for (const auto& subscriber : subscribers) { + ActorContext.Send(subscriber, new TEvUpdatePoolInfo(database, poolId, std::nullopt, std::nullopt)); + } + } + subscribers.clear(); + } + void UpdateDatabaseInfo(const TEvPrivate::TEvFetchDatabaseResponse::TPtr& ev) { if (ev->Get()->Status != Ydb::StatusIds::SUCCESS) { ReplyContinueError(ev->Get()->Status, GroupIssues(ev->Get()->Issues, "Failed to fetch database info")); return; } + if (Serverless != ev->Get()->Serverless) { + ActorContext.Send(MakeKqpProxyID(ActorContext.SelfID.NodeId()), new TEvUpdateDatabaseInfo(ev->Get()->Database, ev->Get()->Serverless)); + } + LastUpdateTime = TInstant::Now(); Serverless = ev->Get()->Serverless; StartPendingRequests(); diff --git a/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.cpp b/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.cpp index b624c57bdd94..44c2606d22ce 100644 --- a/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.cpp +++ b/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.cpp @@ -283,15 +283,8 @@ class TWorkloadServiceYdbSetup : public IYdbSetup { return; } - NResourcePool::TPoolSettings poolConfig; - poolConfig.ConcurrentQueryLimit = Settings_.ConcurrentQueryLimit_; - poolConfig.QueueSize = Settings_.QueueSize_; - poolConfig.QueryCancelAfter = Settings_.QueryCancelAfter_; - poolConfig.QueryMemoryLimitPercentPerNode = Settings_.QueryMemoryLimitPercentPerNode_; - poolConfig.DatabaseLoadCpuThreshold = Settings_.DatabaseLoadCpuThreshold_; - TActorId edgeActor = GetRuntime()->AllocateEdgeActor(); - GetRuntime()->Register(CreatePoolCreatorActor(edgeActor, Settings_.DomainName_, Settings_.PoolId_, poolConfig, nullptr, {})); + GetRuntime()->Register(CreatePoolCreatorActor(edgeActor, Settings_.DomainName_, Settings_.PoolId_, Settings_.GetDefaultPoolSettings(), nullptr, {})); auto response = GetRuntime()->GrabEdgeEvent(edgeActor, FUTURE_WAIT_TIMEOUT); UNIT_ASSERT_VALUES_EQUAL_C(response->Get()->Status, Ydb::StatusIds::SUCCESS, response->Get()->Issues.ToOneLineString()); } @@ -480,6 +473,8 @@ class TWorkloadServiceYdbSetup : public IYdbSetup { } std::unique_ptr GetQueryRequest(const TString& query, const TQueryRunnerSettings& settings) const { + UNIT_ASSERT_C(settings.PoolId_, "Query pool id is not specified"); + auto event = std::make_unique(); event->Record.SetUserToken(NACLib::TUserToken("", settings.UserSID_, {}).SerializeAsString()); @@ -488,7 +483,7 @@ class TWorkloadServiceYdbSetup : public IYdbSetup { request->SetType(NKikimrKqp::QUERY_TYPE_SQL_GENERIC_QUERY); request->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE); request->SetDatabase(Settings_.DomainName_); - request->SetPoolId(settings.PoolId_); + request->SetPoolId(*settings.PoolId_); return event; } @@ -576,6 +571,16 @@ bool TQueryRunnerResultAsync::HasValue() const { //// TYdbSetupSettings +NResourcePool::TPoolSettings TYdbSetupSettings::GetDefaultPoolSettings() const { + NResourcePool::TPoolSettings poolConfig; + poolConfig.ConcurrentQueryLimit = ConcurrentQueryLimit_; + poolConfig.QueueSize = QueueSize_; + poolConfig.QueryCancelAfter = QueryCancelAfter_; + poolConfig.QueryMemoryLimitPercentPerNode = QueryMemoryLimitPercentPerNode_; + poolConfig.DatabaseLoadCpuThreshold = DatabaseLoadCpuThreshold_; + return poolConfig; +} + TIntrusivePtr TYdbSetupSettings::Create() const { return MakeIntrusive(*this); } diff --git a/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.h b/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.h index 35f1a1693140..4aade4462eb3 100644 --- a/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.h +++ b/ydb/core/kqp/workload_service/ut/common/kqp_workload_service_ut_common.h @@ -24,7 +24,7 @@ struct TQueryRunnerSettings { // Query settings FLUENT_SETTING_DEFAULT(ui32, NodeIndex, 0); - FLUENT_SETTING_DEFAULT(TString, PoolId, ""); + FLUENT_SETTING_DEFAULT(std::optional, PoolId, std::nullopt); FLUENT_SETTING_DEFAULT(TString, UserSID, "user@" BUILTIN_SYSTEM_DOMAIN); // Runner settings @@ -76,6 +76,7 @@ struct TYdbSetupSettings { FLUENT_SETTING_DEFAULT(double, QueryMemoryLimitPercentPerNode, -1); FLUENT_SETTING_DEFAULT(double, DatabaseLoadCpuThreshold, -1); + NResourcePool::TPoolSettings GetDefaultPoolSettings() const; TIntrusivePtr Create() const; }; diff --git a/ydb/core/kqp/workload_service/ut/kqp_workload_service_actors_ut.cpp b/ydb/core/kqp/workload_service/ut/kqp_workload_service_actors_ut.cpp index 8d6880d3eb58..0dab28acc6b7 100644 --- a/ydb/core/kqp/workload_service/ut/kqp_workload_service_actors_ut.cpp +++ b/ydb/core/kqp/workload_service/ut/kqp_workload_service_actors_ut.cpp @@ -1,5 +1,6 @@ #include +#include #include #include @@ -164,4 +165,92 @@ Y_UNIT_TEST_SUITE(KqpWorkloadServiceActors) { } } +Y_UNIT_TEST_SUITE(KqpWorkloadServiceSubscriptions) { + TActorId SubscribeOnPool(TIntrusivePtr ydb) { + const auto& settings = ydb->GetSettings(); + auto& runtime = *ydb->GetRuntime(); + const auto& edgeActor = runtime.AllocateEdgeActor(); + + runtime.Send(MakeKqpWorkloadServiceId(runtime.GetNodeId()), edgeActor, new TEvSubscribeOnPoolChanges(settings.DomainName_, settings.PoolId_)); + const auto& response = runtime.GrabEdgeEvent(edgeActor, FUTURE_WAIT_TIMEOUT); + UNIT_ASSERT_C(response, "Subscription update not found"); + + const auto& config = response->Get()->Config; + UNIT_ASSERT_C(config, "Pool config not found"); + UNIT_ASSERT_C(*config == settings.GetDefaultPoolSettings(), "Unexpected pool config"); + + const auto& securityObject = response->Get()->SecurityObject; + UNIT_ASSERT_C(securityObject, "Security object not found"); + UNIT_ASSERT_VALUES_EQUAL_C(securityObject->GetOwnerSID(), BUILTIN_ACL_ROOT, "Unexpected owner user SID"); + + return edgeActor; + } + + Y_UNIT_TEST(TestResourcePoolSubscription) { + auto ydb = TYdbSetupSettings() + .QueueSize(10) + .ConcurrentQueryLimit(5) + .QueryCancelAfter(TDuration::Seconds(42)) + .QueryMemoryLimitPercentPerNode(55.0) + .DatabaseLoadCpuThreshold(30.0) + .Create(); + + SubscribeOnPool(ydb); + } + + Y_UNIT_TEST(TestResourcePoolSubscriptionAfterAlter) { + auto ydb = TYdbSetupSettings().Create(); + + const auto& subscriber = SubscribeOnPool(ydb); + + ydb->ExecuteSchemeQuery(TStringBuilder() << R"( + ALTER RESOURCE POOL )" << ydb->GetSettings().PoolId_ << R"( SET ( + QUEUE_SIZE=42 + ); + )"); + + const auto& response = ydb->GetRuntime()->GrabEdgeEvent(subscriber, FUTURE_WAIT_TIMEOUT); + UNIT_ASSERT_C(response, "Subscription update not found"); + + const auto& config = response->Get()->Config; + UNIT_ASSERT_C(config, "Pool config not found"); + UNIT_ASSERT_VALUES_EQUAL(config->QueueSize, 42); + } + + Y_UNIT_TEST(TestResourcePoolSubscriptionAfterAclChange) { + auto ydb = TYdbSetupSettings().Create(); + + const auto& subscriber = SubscribeOnPool(ydb); + + const TString& userSID = "test@user"; + ydb->ExecuteSchemeQuery(TStringBuilder() << R"( + GRANT ALL ON `/Root/.resource_pools/)" << ydb->GetSettings().PoolId_ << R"(` TO `)" << userSID << R"(`; + )"); + + const auto& response = ydb->GetRuntime()->GrabEdgeEvent(subscriber, FUTURE_WAIT_TIMEOUT); + UNIT_ASSERT_C(response, "Subscription update not found"); + + const auto& securityObject = response->Get()->SecurityObject; + UNIT_ASSERT_C(securityObject, "Security object not found"); + + NACLib::TUserToken token("", userSID, {}); + UNIT_ASSERT_C(securityObject->CheckAccess(NACLib::GenericFull, token), TStringBuilder() << "Unexpected pool access rights: " << securityObject->ToString()); + } + + Y_UNIT_TEST(TestResourcePoolSubscriptionAfterDrop) { + auto ydb = TYdbSetupSettings().Create(); + + const auto& subscriber = SubscribeOnPool(ydb); + + ydb->ExecuteSchemeQuery(TStringBuilder() << R"( + DROP RESOURCE POOL )" << ydb->GetSettings().PoolId_ << R"(; + )"); + + const auto& response = ydb->GetRuntime()->GrabEdgeEvent(subscriber, FUTURE_WAIT_TIMEOUT); + UNIT_ASSERT_C(response, "Subscription update not found"); + UNIT_ASSERT_C(!response->Get()->Config, "Unexpected pool config"); + UNIT_ASSERT_C(!response->Get()->SecurityObject, "Unexpected security object"); + } +} + } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/workload_service/ut/kqp_workload_service_ut.cpp b/ydb/core/kqp/workload_service/ut/kqp_workload_service_ut.cpp index 56c6e7480077..f14c33036ea4 100644 --- a/ydb/core/kqp/workload_service/ut/kqp_workload_service_ut.cpp +++ b/ydb/core/kqp/workload_service/ut/kqp_workload_service_ut.cpp @@ -259,10 +259,12 @@ Y_UNIT_TEST_SUITE(KqpWorkloadService) { TSampleQueries::TSelect42::CheckResult(ydb->ExecuteQuery(TSampleQueries::TSelect42::Query)); TSampleQueries::TSelect42::CheckResult(ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, TQueryRunnerSettings().PoolId(NResourcePool::DEFAULT_POOL_ID))); - ydb->WaitPoolHandlersCount(0, 2, TDuration::Seconds(95)); + ydb->ExecuteSchemeQuery(TStringBuilder() << R"( + DROP RESOURCE POOL )" << ydb->GetSettings().PoolId_ << R"(; + DROP RESOURCE POOL )" << NResourcePool::DEFAULT_POOL_ID << R"(; + )"); - // Check pool creation after cleanup - TSampleQueries::TSelect42::CheckResult(ydb->ExecuteQuery(TSampleQueries::TSelect42::Query)); + ydb->WaitPoolHandlersCount(0, std::nullopt, TDuration::Seconds(95)); } } @@ -516,7 +518,7 @@ Y_UNIT_TEST_SUITE(ResourcePoolsDdl) { } Y_UNIT_TEST_SUITE(ResourcePoolClassifiersDdl) { - Y_UNIT_TEST(ResourcePoolClassifiersPermissions) { + Y_UNIT_TEST(TestResourcePoolClassifiersPermissions) { auto ydb = TYdbSetupSettings().Create(); const TString& userSID = "user@test"; @@ -551,6 +553,138 @@ Y_UNIT_TEST_SUITE(ResourcePoolClassifiersDdl) { UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), EStatus::GENERIC_ERROR, alterResult.GetIssues().ToOneLineString()); UNIT_ASSERT_STRING_CONTAINS(alterResult.GetIssues().ToOneLineString(), "You don't have access permissions for database Root"); } + + TString CreateSampleResourcePoolClassifier(TIntrusivePtr ydb, const TQueryRunnerSettings& settings, const TString& poolId) { + const TString& classifierId = "my_pool_classifier"; + ydb->ExecuteSchemeQuery(TStringBuilder() << R"( + CREATE RESOURCE POOL )" << poolId << R"( WITH ( + CONCURRENT_QUERY_LIMIT=0 + ); + CREATE RESOURCE POOL CLASSIFIER )" << classifierId << R"( WITH ( + RESOURCE_POOL=")" << poolId << R"(", + MEMBERNAME=")" << settings.UserSID_ << R"(" + ); + )"); + + return classifierId; + } + + void WaitForFail(TIntrusivePtr ydb, const TQueryRunnerSettings& settings, const TString& poolId) { + ydb->WaitFor(TDuration::Seconds(5), "Resource pool classifier fail", [ydb, settings, poolId](TString& errorString) { + auto result = ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, settings); + + errorString = result.GetIssues().ToOneLineString(); + return result.GetStatus() == EStatus::PRECONDITION_FAILED && errorString.Contains(TStringBuilder() << "Resource pool " << poolId << " was disabled due to zero concurrent query limit"); + }); + } + + void WaitForSuccess(TIntrusivePtr ydb, const TQueryRunnerSettings& settings) { + ydb->WaitFor(TDuration::Seconds(5), "Resource pool classifier success", [ydb, settings](TString& errorString) { + auto result = ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, settings); + + errorString = result.GetIssues().ToOneLineString(); + return result.GetStatus() == EStatus::SUCCESS; + }); + } + + Y_UNIT_TEST(TestCreateResourcePoolClassifier) { + auto ydb = TYdbSetupSettings().Create(); + + auto settings = TQueryRunnerSettings().PoolId(""); + const TString& poolId = "my_pool"; + CreateSampleResourcePoolClassifier(ydb, settings, poolId); + + WaitForFail(ydb, settings, poolId); + } + + Y_UNIT_TEST(TestAlterResourcePoolClassifier) { + auto ydb = TYdbSetupSettings().Create(); + + auto settings = TQueryRunnerSettings().PoolId(""); + const TString& poolId = "my_pool"; + const TString& classifierId = CreateSampleResourcePoolClassifier(ydb, settings, poolId); + + WaitForFail(ydb, settings, poolId); + + ydb->ExecuteSchemeQuery(TStringBuilder() << R"( + ALTER RESOURCE POOL CLASSIFIER )" << classifierId << R"( RESET ( + RESOURCE_POOL + ); + )"); + + WaitForSuccess(ydb, settings); + } + + Y_UNIT_TEST(TestDropResourcePoolClassifier) { + auto ydb = TYdbSetupSettings().Create(); + + auto settings = TQueryRunnerSettings().PoolId(""); + const TString& poolId = "my_pool"; + const TString& classifierId = CreateSampleResourcePoolClassifier(ydb, settings, poolId); + + WaitForFail(ydb, settings, poolId); + + ydb->ExecuteSchemeQuery(TStringBuilder() << R"( + DROP RESOURCE POOL CLASSIFIER )" << classifierId << R"(; + )"); + + WaitForSuccess(ydb, settings); + } + + Y_UNIT_TEST(TestDropResourcePool) { + auto ydb = TYdbSetupSettings().Create(); + + auto settings = TQueryRunnerSettings().PoolId(""); + const TString& poolId = "my_pool"; + CreateSampleResourcePoolClassifier(ydb, settings, poolId); + + WaitForFail(ydb, settings, poolId); + + ydb->ExecuteSchemeQuery(TStringBuilder() << R"( + DROP RESOURCE POOL )" << poolId << R"(; + )"); + + WaitForSuccess(ydb, settings); + } + + Y_UNIT_TEST(TestResourcePoolClassifierRanks) { + auto ydb = TYdbSetupSettings().Create(); + + auto settings = TQueryRunnerSettings().PoolId(""); + const TString& poolId = "my_pool"; + CreateSampleResourcePoolClassifier(ydb, settings, poolId); + + WaitForFail(ydb, settings, poolId); + + const TString& classifierId = "rank_classifier"; + ydb->ExecuteSchemeQuery(TStringBuilder() << R"( + CREATE RESOURCE POOL CLASSIFIER )" << classifierId << R"( WITH ( + RANK="1", + MEMBERNAME=")" << settings.UserSID_ << R"(" + ); + )"); + + WaitForSuccess(ydb, settings); + + ydb->ExecuteSchemeQuery(TStringBuilder() << R"( + ALTER RESOURCE POOL CLASSIFIER )" << classifierId << R"( RESET ( + RANK + ); + )"); + + WaitForFail(ydb, settings, poolId); + } + + Y_UNIT_TEST(TestExplicitPoolId) { + auto ydb = TYdbSetupSettings().Create(); + + auto settings = TQueryRunnerSettings().PoolId(""); + const TString& poolId = "my_pool"; + CreateSampleResourcePoolClassifier(ydb, settings, poolId); + + WaitForFail(ydb, settings, poolId); + TSampleQueries::TSelect42::CheckResult(ydb->ExecuteQuery(TSampleQueries::TSelect42::Query, TQueryRunnerSettings().PoolId(NResourcePool::DEFAULT_POOL_ID))); + } } } // namespace NKikimr::NKqp