2020
2121#include < ydb/library/yql/utils/yql_panic.h>
2222
23+ #include < library/cpp/containers/absl_flat_hash/flat_hash_map.h>
24+
2325namespace NKikimr {
2426namespace NKqp {
2527namespace NRm {
@@ -44,11 +46,14 @@ using namespace NResourceBroker;
4446namespace {
4547
4648template <typename T>
47- class TLimitedResource {
49+ class TLimitedResource : public TAtomicRefCount <TLimitedResource<T>> {
4850public:
49- explicit TLimitedResource (T limit)
51+ explicit TLimitedResource (T limit, ui64 overPercent )
5052 : Limit(limit)
51- , Used(0 ) {}
53+ , Used(0 )
54+ , OverPercent(overPercent)
55+ , ResourceIsOverCookie(MakeIntrusive<TMemoryResourceCookie>())
56+ {}
5257
5358 T Available () const {
5459 return Limit > Used ? Limit - Used : 0 ;
@@ -58,26 +63,38 @@ class TLimitedResource {
5863 return Available () >= amount;
5964 }
6065
61- bool Acquire (T value) {
66+ bool AcquireIfAvailable (T value) {
6267 if (Available () >= value) {
6368 Used += value;
69+ UpdateCookie ();
6470 return true ;
6571 }
6672 return false ;
6773 }
6874
75+ void UpdateCookie () {
76+ ui64 availableTreshold = GetLimit () / 100 * (100 - OverPercent);
77+ ResourceIsOverCookie->SpillingPercentReached .store (Available () < availableTreshold);
78+ }
79+
6980 void Release (T value) {
7081 if (Used > value) {
7182 Used -= value;
7283 } else {
7384 Used = 0 ;
7485 }
86+
87+ UpdateCookie ();
7588 }
7689
7790 void SetNewLimit (T limit) {
7891 Limit = limit;
7992 }
8093
94+ void SetOverPercent (ui64 overPercent) {
95+ OverPercent = overPercent;
96+ }
97+
8198 T GetLimit () const {
8299 return Limit;
83100 }
@@ -89,6 +106,11 @@ class TLimitedResource {
89106private:
90107 T Limit;
91108 T Used;
109+
110+ ui64 OverPercent = 100 ;
111+
112+ public:
113+ TIntrusivePtr<TMemoryResourceCookie> ResourceIsOverCookie;
92114};
93115
94116struct TEvPrivate {
@@ -112,7 +134,8 @@ class TKqpResourceManager : public IKqpResourceManager {
112134 : Counters(counters)
113135 , ExecutionUnitsResource(config.GetComputeActorsCount())
114136 , ExecutionUnitsLimit(config.GetComputeActorsCount())
115- , ScanQueryMemoryResource(config.GetQueryMemoryLimit())
137+ , SpillingPercent(config.GetSpillingPrecent())
138+ , TotalMemoryResource(MakeIntrusive<TLimitedResource<ui64>>(config.GetQueryMemoryLimit(), config.GetSpillingPrecent()))
116139 {
117140 SetConfigValues (config);
118141 }
@@ -188,11 +211,7 @@ class TKqpResourceManager : public IKqpResourceManager {
188211 }
189212
190213 bool hasScanQueryMemory = true ;
191- ui64 queryMemoryLimit = 0 ;
192214
193- // NOTE(gvit): the first memory request always satisfied.
194- // all other requests are not guaranteed to be satisfied.
195- // In the nearest future we need to implement several layers of memory requests.
196215 bool isFirstAllocationRequest = (resources.ExecutionUnits > 0 && resources.MemoryPool == EKqpMemoryPool::DataQuery);
197216 if (isFirstAllocationRequest) {
198217 TKqpResourcesRequest newRequest = resources;
@@ -210,17 +229,41 @@ class TKqpResourceManager : public IKqpResourceManager {
210229 return result;
211230 }
212231
213- hasScanQueryMemory = ScanQueryMemoryResource.Has (resources.Memory );
214- if (hasScanQueryMemory) {
215- ScanQueryMemoryResource.Acquire (resources.Memory );
216- queryMemoryLimit = QueryMemoryLimit.load ();
232+ hasScanQueryMemory = TotalMemoryResource->AcquireIfAvailable (resources.Memory );
233+ task->TotalMemoryCookie = TotalMemoryResource->ResourceIsOverCookie ;
234+
235+ if (hasScanQueryMemory && !tx->PoolId .empty () && tx->MemoryPoolPercent > 0 ) {
236+ ui64 poolLimit = 1_MB;
237+
238+ {
239+ double poolLimitDouble = static_cast <double >(TotalMemoryResource->GetLimit ()) * tx->MemoryPoolPercent ;
240+ poolLimit = std::max (poolLimit, static_cast <ui64>(poolLimitDouble));
241+ }
242+
243+ auto [it, success] = MemoryNamedPools.emplace (tx->PoolId , nullptr );
244+
245+ if (success) {
246+ it->second = MakeIntrusive<TLimitedResource<ui64>>(poolLimit, SpillingPercent.load ());
247+ } else if (it->second ->GetLimit () != poolLimit) {
248+ it->second ->SetNewLimit (poolLimit);
249+ it->second ->SetOverPercent (SpillingPercent.load ());
250+ }
251+
252+ auto & poolMemory = it->second ;
253+ if (!poolMemory->AcquireIfAvailable (resources.Memory )) {
254+ hasScanQueryMemory = false ;
255+ TotalMemoryResource->Release (resources.Memory );
256+ }
257+
258+ task->PoolMemoryCookie = poolMemory->ResourceIsOverCookie ;
217259 }
218- } // with_lock (Lock)
260+ }
219261
220262 if (!hasScanQueryMemory) {
221263 Counters->RmNotEnoughMemory ->Inc ();
222264 TStringBuilder reason;
223- reason << " TxId: " << txId << " , taskId: " << taskId << " . Not enough memory for query, requested: " << resources.Memory ;
265+ reason << " TxId: " << txId << " , taskId: " << taskId << " . Not enough memory for query, requested: " << resources.Memory
266+ << " . " << tx->ToString ();
224267 result.SetError (NKikimrKqp::TEvStartKqpTasksResponse::NOT_ENOUGH_MEMORY, reason);
225268 return result;
226269 }
@@ -232,28 +275,23 @@ class TKqpResourceManager : public IKqpResourceManager {
232275 if (!result) {
233276 Counters->RmNotEnoughMemory ->Inc ();
234277 with_lock (Lock) {
235- ScanQueryMemoryResource.Release (resources.Memory );
236- } // with_lock (Lock)
278+ TotalMemoryResource->Release (resources.Memory );
279+ auto it = MemoryNamedPools.find (tx->PoolId );
280+ if (it != MemoryNamedPools.end ()) {
281+ it->second ->Release (resources.Memory );
282+ }
283+ }
237284 }
238285 };
239286
240- ui64 txTotalRequestedMemory = tx->GetExtraMemoryAllocatedSize () + resources.Memory ;
241- if (txTotalRequestedMemory > queryMemoryLimit) {
242- TStringBuilder reason;
243- reason << " TxId: " << txId << " , taskId: " << taskId << " . Query memory limit exceeded: "
244- << " requested " << txTotalRequestedMemory;
245- result.SetError (NKikimrKqp::TEvStartKqpTasksResponse::QUERY_MEMORY_LIMIT_EXCEEDED, reason);
246- return result;
247- }
248-
249287 bool allocated = ResourceBroker->SubmitTaskInstant (
250288 TEvResourceBroker::TEvSubmitTask (rbTaskId, rbTaskName, {0 , resources.Memory }, " kqp_query" , 0 , {}),
251289 SelfId);
252290
253291 if (!allocated) {
254292 TStringBuilder reason;
255- reason << " TxId: " << txId << " , taskId: " << taskId << " . Not enough ScanQueryMemory : "
256- << " requested " << resources. Memory ;
293+ reason << " TxId: " << txId << " , taskId: " << taskId << " . Not enough memory for query, requested : " << resources. Memory
294+ << " . " << tx-> ToString () ;
257295 LOG_AS_N (reason);
258296 result.SetError (NKikimrKqp::TEvStartKqpTasksResponse::NOT_ENOUGH_MEMORY, reason);
259297 return result;
@@ -302,14 +340,19 @@ class TKqpResourceManager : public IKqpResourceManager {
302340
303341 if (resources.Memory > 0 ) {
304342 with_lock (Lock) {
305- ScanQueryMemoryResource.Release (resources.Memory );
306- } // with_lock (Lock)
343+ TotalMemoryResource->Release (resources.Memory );
344+ auto it = MemoryNamedPools.find (tx->PoolId );
345+ if (it != MemoryNamedPools.end ()) {
346+ it->second ->Release (resources.Memory );
347+ }
348+ }
307349 }
308350
309- LOG_AS_D (" TxId: " << tx->TxId << " , taskId: " << task->TaskId << " . Released resources, "
310- << " ScanQueryMemory: " << resources.Memory << " , "
311- << " ExternalDataQueryMemory " << resources.ExternalMemory << " , "
312- << " ExecutionUnits " << resources.ExecutionUnits << " ." );
351+ LOG_AS_D (" TxId: " << tx->TxId << " , taskId: " << task->TaskId
352+ << " . Released resources, "
353+ << " Memory: " << resources.Memory << " , "
354+ << " Free Tier: " << resources.ExternalMemory << " , "
355+ << " ExecutionUnits: " << resources.ExecutionUnits << " ." );
313356
314357 FireResourcesPublishing ();
315358 }
@@ -346,7 +389,7 @@ class TKqpResourceManager : public IKqpResourceManager {
346389
347390 with_lock (Lock) {
348391 result.ExecutionUnits = ExecutionUnitsResource.load ();
349- result.Memory [EKqpMemoryPool::ScanQuery] = ScanQueryMemoryResource. Available ();
392+ result.Memory [EKqpMemoryPool::ScanQuery] = TotalMemoryResource-> Available ();
350393 }
351394
352395 return result;
@@ -435,7 +478,8 @@ class TKqpResourceManager : public IKqpResourceManager {
435478 // limits (guarded by Lock)
436479 std::atomic<i32 > ExecutionUnitsResource;
437480 std::atomic<i32 > ExecutionUnitsLimit;
438- TLimitedResource<ui64> ScanQueryMemoryResource;
481+ std::atomic<ui64> SpillingPercent;
482+ TIntrusivePtr<TLimitedResource<ui64>> TotalMemoryResource;
439483 std::atomic<i64 > ExternalDataQueryMemory = 0 ;
440484
441485 // current state
@@ -448,6 +492,8 @@ class TKqpResourceManager : public IKqpResourceManager {
448492 // state for resource info exchanger
449493 std::shared_ptr<TResourceSnapshotState> ResourceSnapshotState;
450494 TActorId ResourceInfoExchanger = TActorId();
495+
496+ absl::flat_hash_map<TString, TIntrusivePtr<TLimitedResource<ui64>>> MemoryNamedPools;
451497};
452498
453499struct TResourceManagers {
@@ -594,7 +640,7 @@ class TKqpResourceManagerActor : public TActorBootstrapped<TKqpResourceManagerAc
594640
595641 if (queueConfig.GetLimit ().GetMemory () > 0 ) {
596642 with_lock (ResourceManager->Lock ) {
597- ResourceManager->ScanQueryMemoryResource . SetNewLimit (queueConfig.GetLimit ().GetMemory ());
643+ ResourceManager->TotalMemoryResource -> SetNewLimit (queueConfig.GetLimit ().GetMemory ());
598644 }
599645 LOG_I (" Total node memory for scan queries: " << queueConfig.GetLimit ().GetMemory () << " bytes" );
600646 }
@@ -711,7 +757,7 @@ class TKqpResourceManagerActor : public TActorBootstrapped<TKqpResourceManagerAc
711757 PRE () {
712758 str << " State storage key: " << WbState.Tenant << Endl;
713759 with_lock (ResourceManager->Lock ) {
714- str << " ScanQuery memory resource: " << ResourceManager->ScanQueryMemoryResource . ToString () << Endl;
760+ str << " ScanQuery memory resource: " << ResourceManager->TotalMemoryResource -> ToString () << Endl;
715761 str << " External DataQuery memory: " << ResourceManager->ExternalDataQueryMemory .load () << Endl;
716762 str << " ExecutionUnits resource: " << ResourceManager->ExecutionUnitsResource .load () << Endl;
717763 }
@@ -806,13 +852,13 @@ class TKqpResourceManagerActor : public TActorBootstrapped<TKqpResourceManagerAc
806852 ActorIdToProto (MakeKqpResourceManagerServiceID (SelfId ().NodeId ()), payload.MutableResourceManagerActorId ()); // legacy
807853 with_lock (ResourceManager->Lock ) {
808854 payload.SetAvailableComputeActors (ResourceManager->ExecutionUnitsResource .load ()); // legacy
809- payload.SetTotalMemory (ResourceManager->ScanQueryMemoryResource . GetLimit ()); // legacy
810- payload.SetUsedMemory (ResourceManager->ScanQueryMemoryResource . GetLimit () - ResourceManager->ScanQueryMemoryResource . Available ()); // legacy
855+ payload.SetTotalMemory (ResourceManager->TotalMemoryResource -> GetLimit ()); // legacy
856+ payload.SetUsedMemory (ResourceManager->TotalMemoryResource -> GetLimit () - ResourceManager->TotalMemoryResource -> Available ()); // legacy
811857
812858 payload.SetExecutionUnits (ResourceManager->ExecutionUnitsResource .load ());
813859 auto * pool = payload.MutableMemory ()->Add ();
814860 pool->SetPool (EKqpMemoryPool::ScanQuery);
815- pool->SetAvailable (ResourceManager->ScanQueryMemoryResource . Available ());
861+ pool->SetAvailable (ResourceManager->TotalMemoryResource -> Available ());
816862 }
817863
818864 LOG_I (" Send to publish resource usage for "
0 commit comments