diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h b/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h index 1e4d9ac58937..2d684d2f6b09 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h +++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h @@ -23,6 +23,7 @@ class TComputeTaskData; class TShardScannerInfo { private: std::optional ActorId; + const ui64 ScanId; const ui64 TabletId; const ui64 Generation; i64 DataChunksInFlightCount = 0; @@ -51,15 +52,16 @@ class TShardScannerInfo { } } public: - TShardScannerInfo(TShardState& state, const IExternalObjectsProvider& externalObjectsProvider) - : TabletId(state.TabletId) + TShardScannerInfo(const ui64 scanId, TShardState& state, const IExternalObjectsProvider& externalObjectsProvider) + : ScanId(scanId) + , TabletId(state.TabletId) , Generation(++state.Generation) { const bool subscribed = std::exchange(state.SubscribedOnTablet, true); const auto& keyColumnTypes = externalObjectsProvider.GetKeyColumnTypes(); auto ranges = state.GetScanRanges(keyColumnTypes); - auto ev = externalObjectsProvider.BuildEvKqpScan(0, Generation, ranges); + auto ev = externalObjectsProvider.BuildEvKqpScan(ScanId, Generation, ranges); AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "start_scanner")("tablet_id", TabletId)("generation", Generation) ("info", state.ToString(keyColumnTypes))("range", DebugPrintRanges(keyColumnTypes, ranges, *AppData()->TypeRegistry)) @@ -250,6 +252,7 @@ class TInFlightShards: public NComputeActor::TScanShardsStatistics { THashMap ShardsByActorId; bool IsActiveFlag = true; THashMap> ShardScanners; + const ui64 ScanId; const IExternalObjectsProvider& ExternalObjectsProvider; public: @@ -313,7 +316,7 @@ class TInFlightShards: public NComputeActor::TScanShardsStatistics { AFL_ENSURE(state.TabletId); AFL_ENSURE(!state.ActorId)("actor_id", state.ActorId); state.State = NComputeActor::EShardState::Starting; - auto newScanner = std::make_shared(state, ExternalObjectsProvider); + auto newScanner = std::make_shared(ScanId, state, ExternalObjectsProvider); AFL_ENSURE(ShardScanners.emplace(state.TabletId, newScanner).second); } @@ -356,8 +359,9 @@ class TInFlightShards: public NComputeActor::TScanShardsStatistics { return nullptr; } - TInFlightShards(const IExternalObjectsProvider& externalObjectsProvider) - : ExternalObjectsProvider(externalObjectsProvider) + TInFlightShards(const ui64 scanId, const IExternalObjectsProvider& externalObjectsProvider) + : ScanId(scanId) + , ExternalObjectsProvider(externalObjectsProvider) { } bool IsActive() const { diff --git a/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp index 4d4bfed29c89..89485b792dc1 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp @@ -34,7 +34,7 @@ TKqpScanFetcherActor::TKqpScanFetcherActor(const NKikimrKqp::TKqpSnapshot& snaps , Snapshot(snapshot) , ShardsScanningPolicy(shardsScanningPolicy) , Counters(counters) - , InFlightShards(*this) + , InFlightShards(ScanId, *this) , InFlightComputes(ComputeActorIds) { Y_UNUSED(traceId); @@ -86,7 +86,11 @@ void TKqpScanFetcherActor::Bootstrap() { void TKqpScanFetcherActor::HandleExecute(TEvScanExchange::TEvAckData::TPtr& ev) { Y_ABORT_UNLESS(ev->Get()->GetFreeSpace()); - ALS_DEBUG(NKikimrServices::KQP_COMPUTE) << "EvAckData (" << SelfId() << "): " << ev->Sender; + AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "AckDataFromCompute")("self_id", SelfId())("scan_id", ScanId) + ("packs_to_send", InFlightComputes.GetPacksToSendCount()) + ("from", ev->Sender)("shards remain", PendingShards.size()) + ("in flight scans", InFlightShards.GetScansCount()) + ("in flight shards", InFlightShards.GetShardsCount()); InFlightComputes.OnComputeAck(ev->Sender, ev->Get()->GetFreeSpace()); CheckFinish(); } @@ -458,12 +462,13 @@ void TKqpScanFetcherActor::ProcessPendingScanDataItem(TEvKqpCompute::TEvScanData state->LastKey = std::move(msg.LastKey); const ui64 rowsCount = msg.GetRowsCount(); - CA_LOG_D("action=got EvScanData;rows=" << rowsCount << ";finished=" << msg.Finished << ";exceeded=" << msg.RequestedBytesLimitReached - << ";from=" << ev->Sender << ";shards remain=" << PendingShards.size() - << ";in flight scans=" << InFlightShards.GetScansCount() - << ";in flight shards=" << InFlightShards.GetShardsCount() - << ";delayed_for=" << latency.SecondsFloat() << " seconds by ratelimiter" - << ";tablet_id=" << state->TabletId); + AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("action","got EvScanData")("rows", rowsCount)("finished", msg.Finished)("exceeded", msg.RequestedBytesLimitReached) + ("scan", ScanId)("packs_to_send", InFlightComputes.GetPacksToSendCount()) + ("from", ev->Sender)("shards remain", PendingShards.size()) + ("in flight scans", InFlightShards.GetScansCount()) + ("in flight shards", InFlightShards.GetShardsCount()) + ("delayed_for_seconds_by_ratelimiter", latency.SecondsFloat()) + ("tablet_id", state->TabletId); auto shardScanner = InFlightShards.GetShardScannerVerified(state->TabletId); auto tasksForCompute = shardScanner->OnReceiveData(msg, shardScanner); AFL_ENSURE(tasksForCompute.size() == 1 || tasksForCompute.size() == 0 || tasksForCompute.size() == ComputeActorIds.size())("size", tasksForCompute.size())("compute_size", ComputeActorIds.size()); diff --git a/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.h b/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.h index 962aee326470..98c839be624c 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.h +++ b/ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.h @@ -168,6 +168,9 @@ class TKqpScanFetcherActor: public NActors::TActorBootstrapped PendingShards; std::deque PendingResolveShards; + static inline TAtomicCounter ScanIdCounter = 0; + const ui64 ScanId = ScanIdCounter.Inc(); + TInFlightShards InFlightShards; TInFlightComputes InFlightComputes; ui32 TotalRetries = 0; diff --git a/ydb/core/tx/columnshard/engines/reader/abstract/read_context.h b/ydb/core/tx/columnshard/engines/reader/abstract/read_context.h index 94f108350e69..c596c7e46026 100644 --- a/ydb/core/tx/columnshard/engines/reader/abstract/read_context.h +++ b/ydb/core/tx/columnshard/engines/reader/abstract/read_context.h @@ -1,10 +1,12 @@ #pragma once #include "read_metadata.h" + #include -#include -#include #include +#include #include +#include + #include namespace NKikimr::NOlap::NReader { @@ -13,6 +15,7 @@ class TComputeShardingPolicy { private: YDB_READONLY(ui32, ShardsCount, 0); YDB_READONLY_DEF(std::vector, ColumnNames); + public: TString DebugString() const { return TStringBuilder() << "shards_count:" << ShardsCount << ";columns=" << JoinSeq(",", ColumnNames) << ";"; @@ -42,10 +45,12 @@ class TReadContext { const NColumnShard::TConcreteScanCounters Counters; TReadMetadataBase::TConstPtr ReadMetadata; NResourceBroker::NSubscribe::TTaskContext ResourcesTaskContext; + const ui64 ScanId; const TActorId ScanActorId; const TActorId ResourceSubscribeActorId; const TActorId ReadCoordinatorActorId; const TComputeShardingPolicy ComputeShardingPolicy; + public: template std::shared_ptr GetReadMetadataPtrVerifiedAs() const { @@ -74,6 +79,10 @@ class TReadContext { return ScanActorId; } + ui64 GetScanId() const { + return ScanId; + } + const TReadMetadataBase::TConstPtr& GetReadMetadata() const { return ReadMetadata; } @@ -86,17 +95,18 @@ class TReadContext { return ResourcesTaskContext; } - TReadContext(const std::shared_ptr& storagesManager, const NColumnShard::TConcreteScanCounters& counters, const TReadMetadataBase::TConstPtr& readMetadata, - const TActorId& scanActorId, const TActorId& resourceSubscribeActorId, const TActorId& readCoordinatorActorId, const TComputeShardingPolicy& computeShardingPolicy) + TReadContext(const std::shared_ptr& storagesManager, const NColumnShard::TConcreteScanCounters& counters, + const TReadMetadataBase::TConstPtr& readMetadata, const TActorId& scanActorId, const TActorId& resourceSubscribeActorId, + const TActorId& readCoordinatorActorId, const TComputeShardingPolicy& computeShardingPolicy, const ui64 scanId) : StoragesManager(storagesManager) , Counters(counters) , ReadMetadata(readMetadata) , ResourcesTaskContext("CS::SCAN_READ", counters.ResourcesSubscriberCounters) + , ScanId(scanId) , ScanActorId(scanActorId) , ResourceSubscribeActorId(resourceSubscribeActorId) , ReadCoordinatorActorId(readCoordinatorActorId) - , ComputeShardingPolicy(computeShardingPolicy) - { + , ComputeShardingPolicy(computeShardingPolicy) { Y_ABORT_UNLESS(ReadMetadata); } }; @@ -111,6 +121,7 @@ class IDataReader { virtual bool DoIsFinished() const = 0; virtual std::vector> DoExtractReadyResults(const int64_t maxRowsInBatch) = 0; virtual TConclusion DoReadNextInterval() = 0; + public: IDataReader(const std::shared_ptr& context); virtual ~IDataReader() = default; @@ -171,4 +182,4 @@ class IDataReader { } }; -} +} // namespace NKikimr::NOlap::NReader diff --git a/ydb/core/tx/columnshard/engines/reader/actor/actor.cpp b/ydb/core/tx/columnshard/engines/reader/actor/actor.cpp index 96de37e50599..351ecca82294 100644 --- a/ydb/core/tx/columnshard/engines/reader/actor/actor.cpp +++ b/ydb/core/tx/columnshard/engines/reader/actor/actor.cpp @@ -80,7 +80,7 @@ void TColumnShardScan::Bootstrap(const TActorContext& ctx) { ReadCoordinatorActorId = ctx.Register(new NBlobOperations::NRead::TReadCoordinatorActor(TabletId, SelfId())); std::shared_ptr context = std::make_shared(StoragesManager, ScanCountersPool, - ReadMetadataRange, SelfId(), ResourceSubscribeActorId, ReadCoordinatorActorId, ComputeShardingPolicy); + ReadMetadataRange, SelfId(), ResourceSubscribeActorId, ReadCoordinatorActorId, ComputeShardingPolicy, ScanId); ScanIterator = ReadMetadataRange->StartScan(context); auto startResult = ScanIterator->Start(); StartInstant = TMonotonic::Now(); diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp index f759fca64f74..d04188a4d9d6 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.cpp @@ -286,6 +286,8 @@ TSpecialReadContext::TSpecialReadContext(const std::shared_ptr& co }; ProcessMemoryGuard = NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildProcessGuard(CommonContext->GetReadMetadata()->GetTxId(), stages); + ProcessScopeGuard = + NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildScopeGuard(CommonContext->GetReadMetadata()->GetTxId(), GetCommonContext()->GetScanId()); auto readSchema = ReadMetadata->GetResultSchema(); SpecColumns = std::make_shared(TIndexInfo::GetSnapshotColumnIdsSet(), readSchema); diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.h index 9f6952dd323b..211f837074a0 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/context.h @@ -15,6 +15,7 @@ class TSpecialReadContext { private: YDB_READONLY_DEF(std::shared_ptr, CommonContext); YDB_READONLY_DEF(std::shared_ptr, ProcessMemoryGuard); + YDB_READONLY_DEF(std::shared_ptr, ProcessScopeGuard); YDB_READONLY_DEF(std::shared_ptr, SpecColumns); YDB_READONLY_DEF(std::shared_ptr, MergeColumns); diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.cpp index ecc0ca754479..90e5b5a7b28c 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.cpp @@ -222,8 +222,8 @@ TConclusion TAllocateMemoryStep::DoExecuteInplace( const std::shared_ptr& source, const TFetchingScriptCursor& step) const { auto allocation = std::make_shared(source, GetProcessingDataSize(source), step); - NGroupedMemoryManager::TScanMemoryLimiterOperator::SendToAllocation( - source->GetContext()->GetProcessMemoryControlId(), source->GetFirstIntervalId(), { allocation }, (ui32)StageIndex); + NGroupedMemoryManager::TScanMemoryLimiterOperator::SendToAllocation(source->GetContext()->GetProcessMemoryControlId(), + source->GetContext()->GetCommonContext()->GetScanId(), source->GetFirstIntervalId(), { allocation }, (ui32)StageIndex); return false; } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/interval.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/interval.cpp index 1dee839137c3..9da043a366c1 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/interval.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/interval.cpp @@ -22,8 +22,8 @@ void TFetchingInterval::ConstructResult() { auto task = std::make_shared(MergingContext, Context, std::move(Sources)); task->SetPriority(NConveyor::ITask::EPriority::High); - NGroupedMemoryManager::TScanMemoryLimiterOperator::SendToAllocation( - Context->GetProcessMemoryControlId(), GetIntervalId(), { task }, (ui32)EStageFeaturesIndexes::Merge); + NGroupedMemoryManager::TScanMemoryLimiterOperator::SendToAllocation(Context->GetProcessMemoryControlId(), + Context->GetCommonContext()->GetScanId(), GetIntervalId(), { task }, (ui32)EStageFeaturesIndexes::Merge); } } @@ -41,14 +41,16 @@ TFetchingInterval::TFetchingInterval(const NArrow::NMerger::TSortableBatchPositi , TaskGuard(Context->GetCommonContext()->GetCounters().GetResourcesAllocationTasksGuard()) , Sources(sources) , IntervalIdx(intervalIdx) - , IntervalGroupGuard(NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildGroupGuard(Context->GetProcessMemoryControlId())) + , IntervalGroupGuard(NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildGroupGuard( + Context->GetProcessMemoryControlId(), context->GetCommonContext()->GetScanId())) , IntervalStateGuard(Context->GetCommonContext()->GetCounters().CreateIntervalStateGuard()) { AFL_VERIFY(Sources.size()); for (auto&& [_, i] : Sources) { if (!i->IsDataReady()) { ++WaitSourcesCount; } else { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "ready_source")("interval_idx", IntervalIdx)("interval_id", GetIntervalId()); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "ready_source")("interval_idx", IntervalIdx)( + "interval_id", GetIntervalId()); } AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "register_source")("interval_idx", IntervalIdx)("interval_id", GetIntervalId()); i->RegisterInterval(*this, i); @@ -81,8 +83,8 @@ void TFetchingInterval::OnPartSendingComplete() { auto task = std::make_shared(MergingContext, Context, std::move(Merger)); task->SetPriority(NConveyor::ITask::EPriority::High); - NGroupedMemoryManager::TScanMemoryLimiterOperator::SendToAllocation( - Context->GetProcessMemoryControlId(), GetIntervalId(), { task }, (ui32)EStageFeaturesIndexes::Merge); + NGroupedMemoryManager::TScanMemoryLimiterOperator::SendToAllocation(Context->GetProcessMemoryControlId(), + Context->GetCommonContext()->GetScanId(), GetIntervalId(), { task }, (ui32)EStageFeaturesIndexes::Merge); } } // namespace NKikimr::NOlap::NReader::NPlain diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.cpp index d8204201837c..9062602df80f 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.cpp @@ -38,11 +38,11 @@ void TScanHead::OnIntervalResult(std::shared_ptrHasMerger())("intervalId", interval->GetIntervalId()); + "merger", interval->HasMerger())("interval_id", interval->GetIntervalId()); break; } else { AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "interval_result")("interval_idx", intervalIdx)("count", - it->second ? it->second->GetRecordsCount() : 0)("merger", interval->HasMerger())("intervalId", interval->GetIntervalId()); + it->second ? it->second->GetRecordsCount() : 0)("merger", interval->HasMerger())("interval_id", interval->GetIntervalId()); } auto result = it->second; ReadyIntervals.erase(it); diff --git a/ydb/core/tx/limiter/grouped_memory/service/actor.cpp b/ydb/core/tx/limiter/grouped_memory/service/actor.cpp index 1112c7f7aaef..e7573c23612e 100644 --- a/ydb/core/tx/limiter/grouped_memory/service/actor.cpp +++ b/ydb/core/tx/limiter/grouped_memory/service/actor.cpp @@ -9,24 +9,26 @@ void TMemoryLimiterActor::Bootstrap() { void TMemoryLimiterActor::Handle(NEvents::TEvExternal::TEvStartTask::TPtr& ev) { for (auto&& i : ev->Get()->GetAllocations()) { - Manager->RegisterAllocation(ev->Get()->GetExternalProcessId(), ev->Get()->GetExternalGroupId(), i, ev->Get()->GetStageFeaturesIdx()); + Manager->RegisterAllocation(ev->Get()->GetExternalProcessId(), ev->Get()->GetExternalScopeId(), ev->Get()->GetExternalGroupId(), i, + ev->Get()->GetStageFeaturesIdx()); } } void TMemoryLimiterActor::Handle(NEvents::TEvExternal::TEvFinishTask::TPtr& ev) { - Manager->UnregisterAllocation(ev->Get()->GetExternalProcessId(), ev->Get()->GetAllocationId()); + Manager->UnregisterAllocation(ev->Get()->GetExternalProcessId(), ev->Get()->GetExternalScopeId(), ev->Get()->GetAllocationId()); } void TMemoryLimiterActor::Handle(NEvents::TEvExternal::TEvUpdateTask::TPtr& ev) { - Manager->UpdateAllocation(ev->Get()->GetExternalProcessId(), ev->Get()->GetAllocationId(), ev->Get()->GetVolume()); + Manager->UpdateAllocation( + ev->Get()->GetExternalProcessId(), ev->Get()->GetExternalScopeId(), ev->Get()->GetAllocationId(), ev->Get()->GetVolume()); } void TMemoryLimiterActor::Handle(NEvents::TEvExternal::TEvFinishGroup::TPtr& ev) { - Manager->UnregisterGroup(ev->Get()->GetExternalProcessId(), ev->Get()->GetExternalGroupId()); + Manager->UnregisterGroup(ev->Get()->GetExternalProcessId(), ev->Get()->GetExternalScopeId(), ev->Get()->GetExternalGroupId()); } void TMemoryLimiterActor::Handle(NEvents::TEvExternal::TEvStartGroup::TPtr& ev) { - Manager->RegisterGroup(ev->Get()->GetExternalProcessId(), ev->Get()->GetExternalGroupId()); + Manager->RegisterGroup(ev->Get()->GetExternalProcessId(), ev->Get()->GetExternalScopeId(), ev->Get()->GetExternalGroupId()); } void TMemoryLimiterActor::Handle(NEvents::TEvExternal::TEvFinishProcess::TPtr& ev) { @@ -37,4 +39,12 @@ void TMemoryLimiterActor::Handle(NEvents::TEvExternal::TEvStartProcess::TPtr& ev Manager->RegisterProcess(ev->Get()->GetExternalProcessId(), ev->Get()->GetStages()); } +void TMemoryLimiterActor::Handle(NEvents::TEvExternal::TEvFinishProcessScope::TPtr& ev) { + Manager->UnregisterProcessScope(ev->Get()->GetExternalProcessId(), ev->Get()->GetExternalScopeId()); +} + +void TMemoryLimiterActor::Handle(NEvents::TEvExternal::TEvStartProcessScope::TPtr& ev) { + Manager->RegisterProcessScope(ev->Get()->GetExternalProcessId(), ev->Get()->GetExternalScopeId()); +} + } // namespace NKikimr::NOlap::NGroupedMemoryManager diff --git a/ydb/core/tx/limiter/grouped_memory/service/actor.h b/ydb/core/tx/limiter/grouped_memory/service/actor.h index c653c8c6e94a..4b4506ba5b99 100644 --- a/ydb/core/tx/limiter/grouped_memory/service/actor.h +++ b/ydb/core/tx/limiter/grouped_memory/service/actor.h @@ -34,6 +34,8 @@ class TMemoryLimiterActor: public NActors::TActorBootstrappedGetTypeName()); } diff --git a/ydb/core/tx/limiter/grouped_memory/service/allocation.cpp b/ydb/core/tx/limiter/grouped_memory/service/allocation.cpp index a9c5b3585a4c..2d04be2c9cef 100644 --- a/ydb/core/tx/limiter/grouped_memory/service/allocation.cpp +++ b/ydb/core/tx/limiter/grouped_memory/service/allocation.cpp @@ -3,12 +3,14 @@ namespace NKikimr::NOlap::NGroupedMemoryManager { -TAllocationInfo::TAllocationInfo(const ui64 processId, const ui64 allocationInternalGroupId, const std::shared_ptr& allocation, +TAllocationInfo::TAllocationInfo(const ui64 processId, const ui64 scopeId, const ui64 allocationInternalGroupId, + const std::shared_ptr& allocation, const std::shared_ptr& stage) : Allocation(allocation) , AllocationInternalGroupId(allocationInternalGroupId) , Identifier(TValidator::CheckNotNull(Allocation)->GetIdentifier()) , ProcessId(processId) + , ScopeId(scopeId) , Stage(stage) { AFL_VERIFY(Stage); AFL_VERIFY(Allocation); diff --git a/ydb/core/tx/limiter/grouped_memory/service/allocation.h b/ydb/core/tx/limiter/grouped_memory/service/allocation.h index 8c8c45092a0a..47d5043188d0 100644 --- a/ydb/core/tx/limiter/grouped_memory/service/allocation.h +++ b/ydb/core/tx/limiter/grouped_memory/service/allocation.h @@ -16,6 +16,7 @@ class TAllocationInfo { ui64 AllocatedVolume = 0; YDB_READONLY(ui64, Identifier, 0); YDB_READONLY(ui64, ProcessId, 0); + YDB_READONLY(ui64, ScopeId, 0); const std::shared_ptr Stage; bool AllocationFailed = false; @@ -25,7 +26,7 @@ class TAllocationInfo { Stage->Free(AllocatedVolume, GetAllocationStatus() == EAllocationStatus::Allocated); } - AFL_INFO(NKikimrServices::GROUPED_MEMORY_LIMITER)("event", "destroy")("allocation_id", Identifier)("stage", Stage->GetName()); + AFL_TRACE(NKikimrServices::GROUPED_MEMORY_LIMITER)("event", "destroy")("allocation_id", Identifier)("stage", Stage->GetName()); } bool IsAllocatable(const ui64 additional) const { @@ -43,11 +44,11 @@ class TAllocationInfo { } [[nodiscard]] bool Allocate(const NActors::TActorId& ownerId) { - AFL_INFO(NKikimrServices::GROUPED_MEMORY_LIMITER)("event", "allocated")("allocation_id", Identifier)("stage", Stage->GetName()); + AFL_TRACE(NKikimrServices::GROUPED_MEMORY_LIMITER)("event", "allocated")("allocation_id", Identifier)("stage", Stage->GetName()); AFL_VERIFY(Allocation)("status", GetAllocationStatus())("volume", AllocatedVolume)("id", Identifier)("stage", Stage->GetName())( "allocation_internal_group_id", AllocationInternalGroupId); const bool result = Allocation->OnAllocated( - std::make_shared(ProcessId, Allocation->GetIdentifier(), ownerId, Allocation->GetMemory()), Allocation); + std::make_shared(ProcessId, ScopeId, Allocation->GetIdentifier(), ownerId, Allocation->GetMemory()), Allocation); if (result) { Stage->Allocate(AllocatedVolume); } else { @@ -68,7 +69,7 @@ class TAllocationInfo { } } - TAllocationInfo(const ui64 processId, const ui64 allocationInternalGroupId, const std::shared_ptr& allocation, + TAllocationInfo(const ui64 processId, const ui64 scopeId, const ui64 allocationInternalGroupId, const std::shared_ptr& allocation, const std::shared_ptr& stage); }; diff --git a/ydb/core/tx/limiter/grouped_memory/service/group.cpp b/ydb/core/tx/limiter/grouped_memory/service/group.cpp index 288ca98d31d1..3bf671ff76b9 100644 --- a/ydb/core/tx/limiter/grouped_memory/service/group.cpp +++ b/ydb/core/tx/limiter/grouped_memory/service/group.cpp @@ -19,13 +19,16 @@ std::vector> TGrouppedAllocations::AllocatePoss return result; } -bool TAllocationGroups::Allocate(TProcessMemory& process, const ui32 allocationsLimit) { +bool TAllocationGroups::Allocate(const bool isPriorityProcess, TProcessMemoryScope& process, const ui32 allocationsLimit) { + AFL_DEBUG(NKikimrServices::GROUPED_MEMORY_LIMITER)("event", "try_allocation")("limit", allocationsLimit)( + "external_process_id", process.ExternalProcessId)("forced_internal_group_id", process.GroupIds.GetMinInternalIdOptional())( + "external_scope_id", process.ExternalScopeId)("forced_external_group_id", process.GroupIds.GetMinExternalIdOptional()); ui32 allocationsCount = 0; while (true) { std::vector toRemove; for (auto it = Groups.begin(); it != Groups.end();) { const ui64 internalGroupId = it->first; - const bool forced = process.IsPriorityProcess() && internalGroupId == process.GroupIds.GetMinInternalIdVerified(); + const bool forced = isPriorityProcess && internalGroupId == process.GroupIds.GetMinInternalIdVerified(); std::vector> allocated; if (forced) { allocated = it->second.ExtractAllocationsToVector(); diff --git a/ydb/core/tx/limiter/grouped_memory/service/group.h b/ydb/core/tx/limiter/grouped_memory/service/group.h index 52d2665ce254..8f4434c886b8 100644 --- a/ydb/core/tx/limiter/grouped_memory/service/group.h +++ b/ydb/core/tx/limiter/grouped_memory/service/group.h @@ -3,7 +3,7 @@ namespace NKikimr::NOlap::NGroupedMemoryManager { -class TProcessMemory; +class TProcessMemoryScope; class TGrouppedAllocations { private: @@ -48,7 +48,7 @@ class TAllocationGroups { return Groups.empty(); } - [[nodiscard]] bool Allocate(TProcessMemory& process, const ui32 allocationsLimit); + [[nodiscard]] bool Allocate(const bool isPriorityProcess, TProcessMemoryScope& process, const ui32 allocationsLimit); [[nodiscard]] std::vector> ExtractGroup(const ui64 id) { auto it = Groups.find(id); diff --git a/ydb/core/tx/limiter/grouped_memory/service/ids.cpp b/ydb/core/tx/limiter/grouped_memory/service/ids.cpp index 5185230efad0..163c90efcf12 100644 --- a/ydb/core/tx/limiter/grouped_memory/service/ids.cpp +++ b/ydb/core/tx/limiter/grouped_memory/service/ids.cpp @@ -7,7 +7,7 @@ ui64 TIdsControl::ExtractInternalIdVerified(const ui64 externalId) { auto it = ExternalIdIntoInternalId.find(externalId); AFL_VERIFY(it != ExternalIdIntoInternalId.end())("external_id", externalId); const ui64 result = it->second; - InternalIds.erase(result); + InternalIdIntoExternalId.erase(result); ExternalIdIntoInternalId.erase(it); return result; } @@ -21,8 +21,8 @@ std::optional TIdsControl::GetInternalIdOptional(const ui64 externalId) co } ui64 TIdsControl::GetMinInternalIdVerified() const { - AFL_VERIFY(InternalIds.size()); - return *InternalIds.begin(); + AFL_VERIFY(InternalIdIntoExternalId.size()); + return InternalIdIntoExternalId.begin()->first; } ui64 TIdsControl::GetInternalIdVerified(const ui64 externalId) const { @@ -33,7 +33,7 @@ ui64 TIdsControl::GetInternalIdVerified(const ui64 externalId) const { ui64 TIdsControl::RegisterExternalId(const ui64 externalId) { AFL_VERIFY(ExternalIdIntoInternalId.emplace(externalId, ++CurrentInternalId).second); - InternalIds.emplace(CurrentInternalId); + InternalIdIntoExternalId.emplace(CurrentInternalId, externalId); return CurrentInternalId; } @@ -43,7 +43,7 @@ ui64 TIdsControl::RegisterExternalIdOrGet(const ui64 externalId) { return it->second; } AFL_VERIFY(ExternalIdIntoInternalId.emplace(externalId, ++CurrentInternalId).second); - InternalIds.emplace(CurrentInternalId); + InternalIdIntoExternalId.emplace(CurrentInternalId, externalId); return CurrentInternalId; } @@ -52,9 +52,15 @@ bool TIdsControl::UnregisterExternalId(const ui64 externalId) { if (it == ExternalIdIntoInternalId.end()) { return false; } - AFL_VERIFY(InternalIds.erase(it->second)); + AFL_VERIFY(InternalIdIntoExternalId.erase(it->second)); ExternalIdIntoInternalId.erase(it); return true; } +ui64 TIdsControl::GetExternalIdVerified(const ui64 internalId) const { + auto it = InternalIdIntoExternalId.find(internalId); + AFL_VERIFY(it != InternalIdIntoExternalId.end()); + return it->second; +} + } // namespace NKikimr::NOlap::NGroupedMemoryManager diff --git a/ydb/core/tx/limiter/grouped_memory/service/ids.h b/ydb/core/tx/limiter/grouped_memory/service/ids.h index 19ce04f95367..acaa700411ab 100644 --- a/ydb/core/tx/limiter/grouped_memory/service/ids.h +++ b/ydb/core/tx/limiter/grouped_memory/service/ids.h @@ -1,5 +1,8 @@ #pragma once #include + +#include + #include #include @@ -7,19 +10,28 @@ namespace NKikimr::NOlap::NGroupedMemoryManager { class TIdsControl { private: - std::map ExternalIdIntoInternalId; - YDB_READONLY_DEF(std::set, InternalIds); + THashMap ExternalIdIntoInternalId; + std::map InternalIdIntoExternalId; ui64 CurrentInternalId = 0; public: void Clear() { ExternalIdIntoInternalId.clear(); - InternalIds.clear(); + InternalIdIntoExternalId.clear(); + } + + const std::map& GetInternalIdToExternalIds() const { + return InternalIdIntoExternalId; + } + + ui64 GetSize() const { + return InternalIdIntoExternalId.size(); } [[nodiscard]] ui64 ExtractInternalIdVerified(const ui64 externalId); ui64 GetMinInternalIdVerified() const; + ui64 GetExternalIdVerified(const ui64 internalId) const; std::optional GetInternalIdOptional(const ui64 externalId) const; @@ -31,16 +43,24 @@ class TIdsControl { [[nodiscard]] bool UnregisterExternalId(const ui64 externalId); std::optional GetMinInternalIdOptional() const { - if (InternalIds.size()) { - return *InternalIds.begin(); + if (InternalIdIntoExternalId.size()) { + return InternalIdIntoExternalId.begin()->first; + } else { + return std::nullopt; + } + } + + std::optional GetMinExternalIdOptional() const { + if (InternalIdIntoExternalId.size()) { + return InternalIdIntoExternalId.begin()->second; } else { return std::nullopt; } } ui64 GetMinInternalIdDef(const ui64 def) const { - if (InternalIds.size()) { - return *InternalIds.begin(); + if (InternalIdIntoExternalId.size()) { + return InternalIdIntoExternalId.begin()->first; } else { return def; } diff --git a/ydb/core/tx/limiter/grouped_memory/service/manager.cpp b/ydb/core/tx/limiter/grouped_memory/service/manager.cpp index a6d3ef0d94cf..96fe8bcefc17 100644 --- a/ydb/core/tx/limiter/grouped_memory/service/manager.cpp +++ b/ydb/core/tx/limiter/grouped_memory/service/manager.cpp @@ -12,23 +12,27 @@ TProcessMemory* TManager::GetProcessMemoryByExternalIdOptional(const ui64 extern return GetProcessMemoryOptional(*internalId); } -void TManager::RegisterGroup(const ui64 externalProcessId, const ui64 externalGroupId) { +void TManager::RegisterGroup(const ui64 externalProcessId, const ui64 externalScopeId, const ui64 externalGroupId) { + AFL_DEBUG(NKikimrServices::GROUPED_MEMORY_LIMITER)("event", "register_group")("external_process_id", externalProcessId)( + "external_group_id", externalGroupId)("size", ProcessIds.GetSize())("external_scope_id", externalScopeId); if (auto* process = GetProcessMemoryByExternalIdOptional(externalProcessId)) { - process->RegisterGroup(externalGroupId); + process->RegisterGroup(externalScopeId, externalGroupId); } RefreshSignals(); } -void TManager::UnregisterGroup(const ui64 externalProcessId, const ui64 externalGroupId) { +void TManager::UnregisterGroup(const ui64 externalProcessId, const ui64 externalScopeId, const ui64 externalGroupId) { + AFL_DEBUG(NKikimrServices::GROUPED_MEMORY_LIMITER)("event", "unregister_group")("external_process_id", externalProcessId)( + "external_group_id", externalGroupId)("size", ProcessIds.GetSize()); if (auto* process = GetProcessMemoryByExternalIdOptional(externalProcessId)) { - process->UnregisterGroup(externalGroupId); + process->UnregisterGroup(externalScopeId, externalGroupId); } RefreshSignals(); } -void TManager::UpdateAllocation(const ui64 externalProcessId, const ui64 allocationId, const ui64 volume) { +void TManager::UpdateAllocation(const ui64 externalProcessId, const ui64 externalScopeId, const ui64 allocationId, const ui64 volume) { TProcessMemory& process = GetProcessMemoryVerified(ProcessIds.GetInternalIdVerified(externalProcessId)); - if (process.UpdateAllocation(allocationId, volume)) { + if (process.UpdateAllocation(externalScopeId, allocationId, volume)) { TryAllocateWaiting(); } @@ -56,21 +60,21 @@ void TManager::TryAllocateWaiting() { RefreshSignals(); } -void TManager::UnregisterAllocation(const ui64 externalProcessId, const ui64 allocationId) { +void TManager::UnregisterAllocation(const ui64 externalProcessId, const ui64 externalScopeId, const ui64 allocationId) { if (auto* process = GetProcessMemoryByExternalIdOptional(externalProcessId)) { - if (process->UnregisterAllocation(allocationId)) { + if (process->UnregisterAllocation(externalScopeId, allocationId)) { TryAllocateWaiting(); } } RefreshSignals(); } -void TManager::RegisterAllocation( - const ui64 externalProcessId, const ui64 externalGroupId, const std::shared_ptr& task, const std::optional& stageIdx) { +void TManager::RegisterAllocation(const ui64 externalProcessId, const ui64 externalScopeId, const ui64 externalGroupId, + const std::shared_ptr& task, const std::optional& stageIdx) { if (auto* process = GetProcessMemoryByExternalIdOptional(externalProcessId)) { - process->RegisterAllocation(externalGroupId, task, stageIdx); + process->RegisterAllocation(externalScopeId, externalGroupId, task, stageIdx); } else { - AFL_VERIFY(!task->OnAllocated(std::make_shared(externalProcessId, task->GetIdentifier(), OwnerActorId, task->GetMemory()), task))( + AFL_VERIFY(!task->OnAllocated(std::make_shared(externalProcessId, externalScopeId, task->GetIdentifier(), OwnerActorId, task->GetMemory()), task))( "ext_group", externalGroupId)("stage_idx", stageIdx); } RefreshSignals(); @@ -105,4 +109,14 @@ void TManager::UnregisterProcess(const ui64 externalProcessId) { RefreshSignals(); } +void TManager::RegisterProcessScope(const ui64 externalProcessId, const ui64 externalProcessScopeId) { + GetProcessMemoryVerified(ProcessIds.GetInternalIdVerified(externalProcessId)).RegisterScope(externalProcessScopeId); + RefreshSignals(); +} + +void TManager::UnregisterProcessScope(const ui64 externalProcessId, const ui64 externalProcessScopeId) { + GetProcessMemoryVerified(ProcessIds.GetInternalIdVerified(externalProcessId)).UnregisterScope(externalProcessScopeId); + RefreshSignals(); +} + } // namespace NKikimr::NOlap::NGroupedMemoryManager diff --git a/ydb/core/tx/limiter/grouped_memory/service/manager.h b/ydb/core/tx/limiter/grouped_memory/service/manager.h index 7e64b83e1167..fd641a3f69b1 100644 --- a/ydb/core/tx/limiter/grouped_memory/service/manager.h +++ b/ydb/core/tx/limiter/grouped_memory/service/manager.h @@ -56,16 +56,20 @@ class TManager { { } - void RegisterGroup(const ui64 externalProcessId, const ui64 externalGroupId); - void UnregisterGroup(const ui64 externalProcessId, const ui64 externalGroupId); + void RegisterGroup(const ui64 externalProcessId, const ui64 externalScopeId, const ui64 externalGroupId); + void UnregisterGroup(const ui64 externalProcessId, const ui64 externalScopeId, const ui64 externalGroupId); + + void RegisterProcessScope(const ui64 externalProcessId, const ui64 externalScopeId); + void UnregisterProcessScope(const ui64 externalProcessId, const ui64 externalScopeId); void RegisterProcess(const ui64 externalProcessId, const std::vector>& stages); void UnregisterProcess(const ui64 externalProcessId); - void RegisterAllocation(const ui64 externalProcessId, const ui64 externalGroupId, const std::shared_ptr& task, + void RegisterAllocation(const ui64 externalProcessId, const ui64 externalScopeId, const ui64 externalGroupId, + const std::shared_ptr& task, const std::optional& stageIdx); - void UnregisterAllocation(const ui64 externalProcessId, const ui64 allocationId); - void UpdateAllocation(const ui64 externalProcessId, const ui64 allocationId, const ui64 volume); + void UnregisterAllocation(const ui64 externalProcessId, const ui64 externalScopeId, const ui64 allocationId); + void UpdateAllocation(const ui64 externalProcessId, const ui64 externalScopeId, const ui64 allocationId, const ui64 volume); bool IsEmpty() const { return Processes.empty(); diff --git a/ydb/core/tx/limiter/grouped_memory/service/process.cpp b/ydb/core/tx/limiter/grouped_memory/service/process.cpp index c928cbc56849..bcde6532e797 100644 --- a/ydb/core/tx/limiter/grouped_memory/service/process.cpp +++ b/ydb/core/tx/limiter/grouped_memory/service/process.cpp @@ -2,102 +2,4 @@ namespace NKikimr::NOlap::NGroupedMemoryManager { -void TProcessMemory::RegisterAllocation( - const ui64 externalGroupId, const std::shared_ptr& task, const std::optional& stageIdx) { - AFL_VERIFY(task); - std::shared_ptr stage; - if (Stages.empty()) { - AFL_VERIFY(!stageIdx); - stage = DefaultStage; - } else { - AFL_VERIFY(stageIdx); - AFL_VERIFY(*stageIdx < Stages.size()); - stage = Stages[*stageIdx]; - } - AFL_VERIFY(stage); - const std::optional internalGroupIdOptional = GroupIds.GetInternalIdOptional(externalGroupId); - if (!internalGroupIdOptional) { - AFL_VERIFY(!task->OnAllocated(std::make_shared(ExternalProcessId, task->GetIdentifier(), OwnerActorId, task->GetMemory()), task))("ext_group", - externalGroupId)("min_group", GroupIds.GetMinInternalIdOptional())( - "stage", stage->GetName()); - AFL_VERIFY(!AllocationInfo.contains(task->GetIdentifier())); - } else { - const ui64 internalGroupId = *internalGroupIdOptional; - auto allocationInfo = RegisterAllocationImpl(internalGroupId, task, stage); - - if (allocationInfo->GetAllocationStatus() != EAllocationStatus::Waiting) { - } else if (WaitAllocations.GetMinGroupId().value_or(internalGroupId) < internalGroupId) { - WaitAllocations.AddAllocation(internalGroupId, allocationInfo); - } else if (allocationInfo->IsAllocatable(0) || (IsPriorityProcess() && internalGroupId == GroupIds.GetMinInternalIdVerified())) { - Y_UNUSED(WaitAllocations.RemoveAllocation(internalGroupId, allocationInfo)); - if (!allocationInfo->Allocate(OwnerActorId)) { - UnregisterAllocation(allocationInfo->GetIdentifier()); - } - } else { - WaitAllocations.AddAllocation(internalGroupId, allocationInfo); - } - } -} - -const std::shared_ptr& TProcessMemory::RegisterAllocationImpl( - const ui64 internalGroupId, const std::shared_ptr& task, const std::shared_ptr& stage) { - auto it = AllocationInfo.find(task->GetIdentifier()); - if (it == AllocationInfo.end()) { - it = AllocationInfo.emplace(task->GetIdentifier(), std::make_shared(ExternalProcessId, internalGroupId, task, stage)).first; - } - return it->second; -} - -bool TProcessMemory::UnregisterAllocation(const ui64 allocationId) { - ui64 memoryAllocated = 0; - auto it = AllocationInfo.find(allocationId); - AFL_VERIFY(it != AllocationInfo.end()); - bool waitFlag = false; - const ui64 internalGroupId = it->second->GetAllocationInternalGroupId(); - switch (it->second->GetAllocationStatus()) { - case EAllocationStatus::Allocated: - case EAllocationStatus::Failed: - AFL_VERIFY(!WaitAllocations.RemoveAllocation(internalGroupId, it->second)); - break; - case EAllocationStatus::Waiting: - AFL_VERIFY(WaitAllocations.RemoveAllocation(internalGroupId, it->second)); - waitFlag = true; - break; - } - AFL_DEBUG(NKikimrServices::GROUPED_MEMORY_LIMITER)("event", "allocation_unregister")("allocation_id", allocationId)("wait", waitFlag)( - "internal_group_id", internalGroupId)("allocation_status", it->second->GetAllocationStatus()); - memoryAllocated = it->second->GetAllocatedVolume(); - AllocationInfo.erase(it); - return !!memoryAllocated; -} - -void TProcessMemory::UnregisterGroupImpl(const ui64 internalGroupId) { - auto data = WaitAllocations.ExtractGroup(internalGroupId); - for (auto&& allocation : data) { - AFL_VERIFY(!allocation->Allocate(OwnerActorId)); - } -} - -void TProcessMemory::UnregisterGroup(const ui64 externalGroupId) { - const ui64 internalGroupId = GroupIds.ExtractInternalIdVerified(externalGroupId); - AFL_INFO(NKikimrServices::GROUPED_MEMORY_LIMITER)("event", "remove_group")("external_group_id", externalGroupId)( - "internal_group_id", internalGroupId); - UnregisterGroupImpl(internalGroupId); - if (IsPriorityProcess() && (internalGroupId < GroupIds.GetMinInternalIdDef(internalGroupId))) { - Y_UNUSED(TryAllocateWaiting(0)); - } -} - -void TProcessMemory::RegisterGroup(const ui64 externalGroupId) { - Y_UNUSED(GroupIds.RegisterExternalId(externalGroupId)); -} - -void TProcessMemory::Unregister() { - for (auto&& i : GroupIds.GetInternalIds()) { - UnregisterGroupImpl(i); - } - GroupIds.Clear(); - AllocationInfo.clear(); -} - } // namespace NKikimr::NOlap::NGroupedMemoryManager diff --git a/ydb/core/tx/limiter/grouped_memory/service/process.h b/ydb/core/tx/limiter/grouped_memory/service/process.h index 9b7b5e522c6d..a1c13e091d59 100644 --- a/ydb/core/tx/limiter/grouped_memory/service/process.h +++ b/ydb/core/tx/limiter/grouped_memory/service/process.h @@ -2,37 +2,231 @@ #include "group.h" #include "ids.h" +#include + namespace NKikimr::NOlap::NGroupedMemoryManager { -class TProcessMemory { +class TProcessMemoryScope { private: const ui64 ExternalProcessId; + const ui64 ExternalScopeId; TAllocationGroups WaitAllocations; THashMap> AllocationInfo; TIdsControl GroupIds; + ui32 Links = 1; const NActors::TActorId OwnerActorId; - bool PriorityProcessFlag = false; - - const std::shared_ptr& RegisterAllocationImpl( - const ui64 internalGroupId, const std::shared_ptr& task, const std::shared_ptr& stage); - void UnregisterGroupImpl(const ui64 internalGroupId); TAllocationInfo& GetAllocationInfoVerified(const ui64 allocationId) const { auto it = AllocationInfo.find(allocationId); AFL_VERIFY(it != AllocationInfo.end()); return *it->second; } + + void UnregisterGroupImpl(const ui64 internalGroupId) { + auto data = WaitAllocations.ExtractGroup(internalGroupId); + for (auto&& allocation : data) { + AFL_VERIFY(!allocation->Allocate(OwnerActorId)); + } + } + + const std::shared_ptr& RegisterAllocationImpl( + const ui64 internalGroupId, const std::shared_ptr& task, const std::shared_ptr& stage) { + auto it = AllocationInfo.find(task->GetIdentifier()); + if (it == AllocationInfo.end()) { + it = AllocationInfo + .emplace(task->GetIdentifier(), + std::make_shared(ExternalProcessId, ExternalScopeId, internalGroupId, task, stage)) + .first; + } + return it->second; + } + friend class TAllocationGroups; +public: + TProcessMemoryScope(const ui64 externalProcessId, const ui64 externalScopeId, const NActors::TActorId& ownerActorId) + : ExternalProcessId(externalProcessId) + , ExternalScopeId(externalScopeId) + , OwnerActorId(ownerActorId) { + } + + void Register() { + ++Links; + } + + [[nodiscard]] bool Unregister() { + if (--Links) { + return false; + } + for (auto&& [i, _] : GroupIds.GetInternalIdToExternalIds()) { + UnregisterGroupImpl(i); + } + GroupIds.Clear(); + AllocationInfo.clear(); + return true; + } + + void RegisterAllocation(const bool isPriorityProcess, const ui64 externalGroupId, const std::shared_ptr& task, + const std::shared_ptr& stage) { + AFL_VERIFY(task); + AFL_VERIFY(stage); + const std::optional internalGroupIdOptional = GroupIds.GetInternalIdOptional(externalGroupId); + if (!internalGroupIdOptional) { + AFL_VERIFY(!task->OnAllocated(std::make_shared(ExternalProcessId, ExternalScopeId, task->GetIdentifier(), OwnerActorId, task->GetMemory()), task))( + "ext_group", externalGroupId)( + "min_group", GroupIds.GetMinInternalIdOptional())("stage", stage->GetName()); + AFL_VERIFY(!AllocationInfo.contains(task->GetIdentifier())); + } else { + const ui64 internalGroupId = *internalGroupIdOptional; + auto allocationInfo = RegisterAllocationImpl(internalGroupId, task, stage); + + if (allocationInfo->GetAllocationStatus() != EAllocationStatus::Waiting) { + } else if (WaitAllocations.GetMinGroupId().value_or(internalGroupId) < internalGroupId) { + WaitAllocations.AddAllocation(internalGroupId, allocationInfo); + } else if (allocationInfo->IsAllocatable(0) || (isPriorityProcess && internalGroupId == GroupIds.GetMinInternalIdVerified())) { + Y_UNUSED(WaitAllocations.RemoveAllocation(internalGroupId, allocationInfo)); + if (!allocationInfo->Allocate(OwnerActorId)) { + UnregisterAllocation(allocationInfo->GetIdentifier()); + } + } else { + WaitAllocations.AddAllocation(internalGroupId, allocationInfo); + } + } + } + + bool UpdateAllocation(const ui64 allocationId, const ui64 volume) { + GetAllocationInfoVerified(allocationId).SetAllocatedVolume(volume); + return true; + } + + bool TryAllocateWaiting(const bool isPriorityProcess, const ui32 allocationsCountLimit) { + return WaitAllocations.Allocate(isPriorityProcess, *this, allocationsCountLimit); + } + + bool UnregisterAllocation(const ui64 allocationId) { + ui64 memoryAllocated = 0; + auto it = AllocationInfo.find(allocationId); + AFL_VERIFY(it != AllocationInfo.end()); + bool waitFlag = false; + const ui64 internalGroupId = it->second->GetAllocationInternalGroupId(); + switch (it->second->GetAllocationStatus()) { + case EAllocationStatus::Allocated: + case EAllocationStatus::Failed: + AFL_VERIFY(!WaitAllocations.RemoveAllocation(internalGroupId, it->second)); + break; + case EAllocationStatus::Waiting: + AFL_VERIFY(WaitAllocations.RemoveAllocation(internalGroupId, it->second)); + waitFlag = true; + break; + } + AFL_DEBUG(NKikimrServices::GROUPED_MEMORY_LIMITER)("event", "allocation_unregister")("allocation_id", allocationId)("wait", waitFlag)( + "internal_group_id", internalGroupId)("allocation_status", it->second->GetAllocationStatus()); + memoryAllocated = it->second->GetAllocatedVolume(); + AllocationInfo.erase(it); + return !!memoryAllocated; + } + + void UnregisterGroup(const bool isPriorityProcess, const ui64 externalGroupId) { + const ui64 internalGroupId = GroupIds.ExtractInternalIdVerified(externalGroupId); + AFL_INFO(NKikimrServices::GROUPED_MEMORY_LIMITER)("event", "remove_group")("external_group_id", externalGroupId)( + "internal_group_id", internalGroupId); + UnregisterGroupImpl(internalGroupId); + if (isPriorityProcess && (internalGroupId < GroupIds.GetMinInternalIdDef(internalGroupId))) { + Y_UNUSED(TryAllocateWaiting(isPriorityProcess, 0)); + } + } + + void RegisterGroup(const ui64 externalGroupId) { + Y_UNUSED(GroupIds.RegisterExternalId(externalGroupId)); + } +}; + +class TProcessMemory { +private: + const ui64 ExternalProcessId; + + const NActors::TActorId OwnerActorId; + bool PriorityProcessFlag = false; + YDB_ACCESSOR(ui32, LinksCount, 1); YDB_READONLY_DEF(std::vector>, Stages); const std::shared_ptr DefaultStage; + THashMap> AllocationScopes; + + TProcessMemoryScope* GetAllocationScopeOptional(const ui64 externalScopeId) const { + auto it = AllocationScopes.find(externalScopeId); + if (it == AllocationScopes.end()) { + return nullptr; + } + return it->second.get(); + } + + TProcessMemoryScope& GetAllocationScopeVerified(const ui64 externalScopeId) const { + return *TValidator::CheckNotNull(GetAllocationScopeOptional(externalScopeId)); + } public: bool IsPriorityProcess() const { return PriorityProcessFlag; } + bool UpdateAllocation(const ui64 externalScopeId, const ui64 allocationId, const ui64 volume) { + return GetAllocationScopeVerified(externalScopeId).UpdateAllocation(allocationId, volume); + } + + void RegisterAllocation( + const ui64 externalScopeId, const ui64 externalGroupId, const std::shared_ptr& task, const std::optional& stageIdx) { + AFL_VERIFY(task); + std::shared_ptr stage; + if (Stages.empty()) { + AFL_VERIFY(!stageIdx); + stage = DefaultStage; + } else { + AFL_VERIFY(stageIdx); + AFL_VERIFY(*stageIdx < Stages.size()); + stage = Stages[*stageIdx]; + } + AFL_VERIFY(stage); + auto& scope = GetAllocationScopeVerified(externalScopeId); + scope.RegisterAllocation(IsPriorityProcess(), externalGroupId, task, stage); + } + + bool UnregisterAllocation(const ui64 externalScopeId, const ui64 allocationId) { + if (auto* scope = GetAllocationScopeOptional(externalScopeId)) { + return scope->UnregisterAllocation(allocationId); + } + return false; + } + + void UnregisterGroup(const ui64 externalScopeId, const ui64 externalGroupId) { + if (auto* scope = GetAllocationScopeOptional(externalScopeId)) { + scope->UnregisterGroup(IsPriorityProcess(), externalGroupId); + } + } + + void RegisterGroup(const ui64 externalScopeId, const ui64 externalGroupId) { + GetAllocationScopeVerified(externalScopeId).RegisterGroup(externalGroupId); + } + + void UnregisterScope(const ui64 externalScopeId) { + auto it = AllocationScopes.find(externalScopeId); + AFL_VERIFY(it != AllocationScopes.end()); + if (it->second->Unregister()) { + AllocationScopes.erase(it); + } + + } + + void RegisterScope(const ui64 externalScopeId) { + auto it = AllocationScopes.find(externalScopeId); + if (it == AllocationScopes.end()) { + AFL_VERIFY(AllocationScopes.emplace(externalScopeId, std::make_shared(ExternalProcessId, externalScopeId, OwnerActorId)).second); + } else { + it->second->Register(); + } + + } + void SetPriorityProcess() { AFL_VERIFY(!PriorityProcessFlag); PriorityProcessFlag = true; @@ -44,24 +238,24 @@ class TProcessMemory { , OwnerActorId(ownerActorId) , PriorityProcessFlag(isPriority) , Stages(stages) - , DefaultStage(defaultStage) - { + , DefaultStage(defaultStage) { } - void RegisterAllocation(const ui64 externalGroupId, const std::shared_ptr& task, const std::optional& stageIdx); - bool UnregisterAllocation(const ui64 allocationId); - bool UpdateAllocation(const ui64 allocationId, const ui64 volume) { - GetAllocationInfoVerified(allocationId).SetAllocatedVolume(volume); - return true; + bool TryAllocateWaiting(const ui32 allocationsCountLimit) { + bool allocated = false; + for (auto&& i : AllocationScopes) { + if (i.second->TryAllocateWaiting(IsPriorityProcess(), allocationsCountLimit)) { + allocated = true; + } + } + return allocated; } - void Unregister(); - - void UnregisterGroup(const ui64 externalGroupId); - void RegisterGroup(const ui64 externalGroupId); - - bool TryAllocateWaiting(const ui32 allocationsCountLimit) { - return WaitAllocations.Allocate(*this, allocationsCountLimit); + void Unregister() { + for (auto&& i : AllocationScopes) { + Y_UNUSED(i.second->Unregister()); + } + AllocationScopes.clear(); } }; diff --git a/ydb/core/tx/limiter/grouped_memory/usage/abstract.cpp b/ydb/core/tx/limiter/grouped_memory/usage/abstract.cpp index 67910b9625d3..2d72f0039846 100644 --- a/ydb/core/tx/limiter/grouped_memory/usage/abstract.cpp +++ b/ydb/core/tx/limiter/grouped_memory/usage/abstract.cpp @@ -7,7 +7,8 @@ namespace NKikimr::NOlap::NGroupedMemoryManager { TAllocationGuard::~TAllocationGuard() { if (TlsActivationContext && !Released) { - NActors::TActivationContext::AsActorContext().Send(ActorId, std::make_unique(ProcessId, AllocationId)); + NActors::TActivationContext::AsActorContext().Send( + ActorId, std::make_unique(ProcessId, ScopeId, AllocationId)); } } @@ -16,7 +17,7 @@ void TAllocationGuard::Update(const ui64 newVolume) { Memory = newVolume; if (TlsActivationContext) { NActors::TActivationContext::AsActorContext().Send( - ActorId, std::make_unique(ProcessId, AllocationId, newVolume)); + ActorId, std::make_unique(ProcessId, ScopeId, AllocationId, newVolume)); } } @@ -30,16 +31,19 @@ bool IAllocation::OnAllocated(std::shared_ptr&& guard, const s TGroupGuard::~TGroupGuard() { if (TlsActivationContext) { - NActors::TActivationContext::AsActorContext().Send(ActorId, std::make_unique(ProcessId, GroupId)); + NActors::TActivationContext::AsActorContext().Send( + ActorId, std::make_unique(ProcessId, ExternalScopeId, GroupId)); } } -TGroupGuard::TGroupGuard(const NActors::TActorId& actorId, const ui64 processId, const ui64 groupId) +TGroupGuard::TGroupGuard(const NActors::TActorId& actorId, const ui64 processId, const ui64 externalScopeId, const ui64 groupId) : ActorId(actorId) , ProcessId(processId) + , ExternalScopeId(externalScopeId) , GroupId(groupId) { if (TlsActivationContext) { - NActors::TActivationContext::AsActorContext().Send(ActorId, std::make_unique(ProcessId, GroupId)); + NActors::TActivationContext::AsActorContext().Send( + ActorId, std::make_unique(ProcessId, ExternalScopeId, GroupId)); } } @@ -57,4 +61,19 @@ TProcessGuard::TProcessGuard(const NActors::TActorId& actorId, const ui64 proces } } +TScopeGuard::~TScopeGuard() { + if (TlsActivationContext) { + NActors::TActivationContext::AsActorContext().Send(ActorId, std::make_unique(ProcessId, ScopeId)); + } +} + +TScopeGuard::TScopeGuard(const NActors::TActorId& actorId, const ui64 processId, const ui64 scopeId) + : ActorId(actorId) + , ProcessId(processId) + , ScopeId(scopeId) { + if (TlsActivationContext) { + NActors::TActivationContext::AsActorContext().Send(ActorId, std::make_unique(ProcessId, ScopeId)); + } +} + } // namespace NKikimr::NOlap::NGroupedMemoryManager diff --git a/ydb/core/tx/limiter/grouped_memory/usage/abstract.h b/ydb/core/tx/limiter/grouped_memory/usage/abstract.h index c65be421b377..d92120f46fb6 100644 --- a/ydb/core/tx/limiter/grouped_memory/usage/abstract.h +++ b/ydb/core/tx/limiter/grouped_memory/usage/abstract.h @@ -14,10 +14,11 @@ class TGroupGuard { private: const NActors::TActorId ActorId; YDB_READONLY(ui64, ProcessId, 0); + YDB_READONLY(ui64, ExternalScopeId, 0); YDB_READONLY(ui64, GroupId, 0); public: - TGroupGuard(const NActors::TActorId& actorId, const ui64 processId, const ui64 groupId); + TGroupGuard(const NActors::TActorId& actorId, const ui64 processId, const ui64 externalScopeId, const ui64 groupId); ~TGroupGuard(); }; @@ -33,18 +34,32 @@ class TProcessGuard { ~TProcessGuard(); }; +class TScopeGuard { +private: + const NActors::TActorId ActorId; + YDB_READONLY(ui64, ProcessId, 0); + YDB_READONLY(ui64, ScopeId, 0); + +public: + TScopeGuard(const NActors::TActorId& actorId, const ui64 processId, const ui64 scopeId); + + ~TScopeGuard(); +}; + class TAllocationGuard { private: const NActors::TActorId ActorId; YDB_READONLY(ui64, ProcessId, 0) + YDB_READONLY(ui64, ScopeId, 0) YDB_READONLY(ui64, AllocationId, 0) YDB_READONLY(ui64, Memory, 0) bool Released = false; public: - TAllocationGuard(const ui64 processId, const ui64 allocationId, const NActors::TActorId actorId, const ui64 memory) + TAllocationGuard(const ui64 processId, const ui64 scopeId, const ui64 allocationId, const NActors::TActorId actorId, const ui64 memory) : ActorId(actorId) , ProcessId(processId) + , ScopeId(scopeId) , AllocationId(allocationId) , Memory(memory) { } diff --git a/ydb/core/tx/limiter/grouped_memory/usage/events.h b/ydb/core/tx/limiter/grouped_memory/usage/events.h index 3829f9a03d4d..d3a8200c584c 100644 --- a/ydb/core/tx/limiter/grouped_memory/usage/events.h +++ b/ydb/core/tx/limiter/grouped_memory/usage/events.h @@ -17,6 +17,8 @@ struct TEvExternal { EvUpdateAllocationTask, EvStartAllocationProcess, EvFinishAllocationProcess, + EvStartAllocationProcessScope, + EvFinishAllocationProcessScope, EvEnd }; @@ -25,15 +27,17 @@ struct TEvExternal { YDB_READONLY_DEF(std::vector>, Allocations); YDB_READONLY_DEF(std::optional, StageFeaturesIdx); YDB_READONLY(ui64, ExternalProcessId, 0); + YDB_READONLY(ui64, ExternalScopeId, 0); YDB_READONLY(ui64, ExternalGroupId, 0); public: - explicit TEvStartTask(const ui64 externalProcessId, + explicit TEvStartTask(const ui64 externalProcessId, const ui64 externalScopeId, const ui64 externalGroupId, const std::vector>& allocations, const std::optional& stageFeaturesIdx) : Allocations(allocations) , StageFeaturesIdx(stageFeaturesIdx) , ExternalProcessId(externalProcessId) + , ExternalScopeId(externalScopeId) , ExternalGroupId(externalGroupId) { AFL_VERIFY(Allocations.size()); } @@ -42,11 +46,13 @@ struct TEvExternal { class TEvFinishTask: public NActors::TEventLocal { private: YDB_READONLY(ui64, ExternalProcessId, 0); + YDB_READONLY(ui64, ExternalScopeId, 0); YDB_READONLY(ui64, AllocationId, 0); public: - explicit TEvFinishTask(const ui64 externalProcessId, const ui64 allocationId) + explicit TEvFinishTask(const ui64 externalProcessId, const ui64 externalScopeId, const ui64 allocationId) : ExternalProcessId(externalProcessId) + , ExternalScopeId(externalScopeId) , AllocationId(allocationId) { } }; @@ -54,12 +60,14 @@ struct TEvExternal { class TEvUpdateTask: public NActors::TEventLocal { private: YDB_READONLY(ui64, ExternalProcessId, 0); + YDB_READONLY(ui64, ExternalScopeId, 0); YDB_READONLY(ui64, AllocationId, 0); YDB_READONLY(ui64, Volume, 0); public: - explicit TEvUpdateTask(const ui64 externalProcessId, const ui64 allocationId, const ui64 volume) + explicit TEvUpdateTask(const ui64 externalProcessId, const ui64 externalScopeId, const ui64 allocationId, const ui64 volume) : ExternalProcessId(externalProcessId) + , ExternalScopeId(externalScopeId) , AllocationId(allocationId) , Volume(volume) { } @@ -68,11 +76,13 @@ struct TEvExternal { class TEvFinishGroup: public NActors::TEventLocal { private: YDB_READONLY(ui64, ExternalProcessId, 0); + YDB_READONLY(ui32, ExternalScopeId, 0); YDB_READONLY(ui64, ExternalGroupId, 0); public: - explicit TEvFinishGroup(const ui64 externalProcessId, const ui64 externalGroupId) + explicit TEvFinishGroup(const ui64 externalProcessId, const ui32 externalScopeId, const ui64 externalGroupId) : ExternalProcessId(externalProcessId) + , ExternalScopeId(externalScopeId) , ExternalGroupId(externalGroupId) { } }; @@ -80,11 +90,13 @@ struct TEvExternal { class TEvStartGroup: public NActors::TEventLocal { private: YDB_READONLY(ui64, ExternalProcessId, 0); + YDB_READONLY(ui32, ExternalScopeId, 0); YDB_READONLY(ui64, ExternalGroupId, 0); public: - explicit TEvStartGroup(const ui64 externalProcessId, const ui64 externalGroupId) + explicit TEvStartGroup(const ui64 externalProcessId, const ui32 externalScopeId, const ui64 externalGroupId) : ExternalProcessId(externalProcessId) + , ExternalScopeId(externalScopeId) , ExternalGroupId(externalGroupId) { } }; @@ -109,5 +121,30 @@ struct TEvExternal { , Stages(stages) { } }; + + class TEvFinishProcessScope: public NActors::TEventLocal { + private: + YDB_READONLY(ui64, ExternalProcessId, 0); + YDB_READONLY(ui64, ExternalScopeId, 0); + + public: + explicit TEvFinishProcessScope(const ui64 externalProcessId, const ui64 externalScopeId) + : ExternalProcessId(externalProcessId) + , ExternalScopeId(externalScopeId) + { + } + }; + + class TEvStartProcessScope: public NActors::TEventLocal { + private: + YDB_READONLY(ui64, ExternalProcessId, 0); + YDB_READONLY(ui64, ExternalScopeId, 0); + + public: + explicit TEvStartProcessScope(const ui64 externalProcessId, const ui64 externalScopeId) + : ExternalProcessId(externalProcessId) + , ExternalScopeId(externalScopeId) { + } + }; }; } // namespace NKikimr::NOlap::NGroupedMemoryManager::NEvents diff --git a/ydb/core/tx/limiter/grouped_memory/usage/service.h b/ydb/core/tx/limiter/grouped_memory/usage/service.h index 166f3c1827e4..8192743218b1 100644 --- a/ydb/core/tx/limiter/grouped_memory/usage/service.h +++ b/ydb/core/tx/limiter/grouped_memory/usage/service.h @@ -44,11 +44,17 @@ class TServiceOperatorImpl { return Singleton()->DefaultStageFeatures; } - static std::shared_ptr BuildGroupGuard(const ui64 processId) { + static std::shared_ptr BuildGroupGuard(const ui64 processId, const ui32 scopeId) { static TAtomicCounter counter = 0; auto& context = NActors::TActorContext::AsActorContext(); const NActors::TActorId& selfId = context.SelfID; - return std::make_shared(MakeServiceId(selfId.NodeId()), processId, counter.Inc()); + return std::make_shared(MakeServiceId(selfId.NodeId()), processId, scopeId, counter.Inc()); + } + + static std::shared_ptr BuildScopeGuard(const ui64 processId, const ui32 scopeId) { + auto& context = NActors::TActorContext::AsActorContext(); + const NActors::TActorId& selfId = context.SelfID; + return std::make_shared(MakeServiceId(selfId.NodeId()), processId, scopeId); } static std::shared_ptr BuildProcessGuard(const ui64 processId, const std::vector>& stages) { @@ -57,17 +63,18 @@ class TServiceOperatorImpl { return std::make_shared(MakeServiceId(selfId.NodeId()), processId, stages); } - static bool SendToAllocation(const ui64 processId, const ui64 groupId, const std::vector>& tasks, + static bool SendToAllocation(const ui64 processId, const ui64 scopeId, const ui64 groupId, + const std::vector>& tasks, const std::optional& stageIdx) { auto& context = NActors::TActorContext::AsActorContext(); const NActors::TActorId& selfId = context.SelfID; if (TSelf::IsEnabled()) { - context.Send(MakeServiceId(selfId.NodeId()), new NEvents::TEvExternal::TEvStartTask(processId, groupId, tasks, stageIdx)); + context.Send(MakeServiceId(selfId.NodeId()), new NEvents::TEvExternal::TEvStartTask(processId, scopeId, groupId, tasks, stageIdx)); return true; } else { for (auto&& i : tasks) { if (!i->IsAllocated()) { - AFL_VERIFY(i->OnAllocated(std::make_shared(0, 0, NActors::TActorId(), i->GetMemory()), i)); + AFL_VERIFY(i->OnAllocated(std::make_shared(0, 0, 0, NActors::TActorId(), i->GetMemory()), i)); } } return false; diff --git a/ydb/core/tx/limiter/grouped_memory/ut/ut_manager.cpp b/ydb/core/tx/limiter/grouped_memory/ut/ut_manager.cpp index ac9abcdf5b48..277d62903205 100644 --- a/ydb/core/tx/limiter/grouped_memory/ut/ut_manager.cpp +++ b/ydb/core/tx/limiter/grouped_memory/ut/ut_manager.cpp @@ -42,24 +42,26 @@ Y_UNIT_TEST_SUITE(GroupedMemoryLimiter) { { auto alloc1 = std::make_shared(50); manager->RegisterProcess(0, {}); - manager->RegisterGroup(0, 1); - manager->RegisterAllocation(0, 1, alloc1, {}); + manager->RegisterProcessScope(0, 0); + manager->RegisterGroup(0, 0, 1); + manager->RegisterAllocation(0, 0, 1, alloc1, {}); AFL_VERIFY(alloc1->IsAllocated()); auto alloc1_1 = std::make_shared(50); - manager->RegisterAllocation(0, 1, alloc1_1, {}); + manager->RegisterAllocation(0, 0, 1, alloc1_1, {}); AFL_VERIFY(alloc1_1->IsAllocated()); - manager->RegisterGroup(0, 2); + manager->RegisterGroup(0, 0, 2); auto alloc2 = std::make_shared(50); - manager->RegisterAllocation(0, 2, alloc2, {}); + manager->RegisterAllocation(0, 0, 2, alloc2, {}); AFL_VERIFY(!alloc2->IsAllocated()); - manager->UnregisterAllocation(0, alloc1->GetIdentifier()); + manager->UnregisterAllocation(0, 0, alloc1->GetIdentifier()); AFL_VERIFY(alloc2->IsAllocated()); - manager->UnregisterAllocation(0, alloc2->GetIdentifier()); - manager->UnregisterAllocation(0, alloc1_1->GetIdentifier()); - manager->UnregisterGroup(0, 1); - manager->UnregisterGroup(0, 2); + manager->UnregisterAllocation(0, 0, alloc2->GetIdentifier()); + manager->UnregisterAllocation(0, 0, alloc1_1->GetIdentifier()); + manager->UnregisterGroup(0, 0, 1); + manager->UnregisterGroup(0, 0, 2); + manager->UnregisterProcessScope(0, 0); manager->UnregisterProcess(0); } AFL_VERIFY(!stage->GetUsage().Val()); @@ -80,36 +82,38 @@ Y_UNIT_TEST_SUITE(GroupedMemoryLimiter) { auto manager = std::make_shared(NActors::TActorId(), config, "test", counters, stage); { manager->RegisterProcess(0, {}); + manager->RegisterProcessScope(0, 0); auto alloc1 = std::make_shared(10); - manager->RegisterGroup(0, 1); - manager->RegisterAllocation(0, 1, alloc1, {}); + manager->RegisterGroup(0, 0, 1); + manager->RegisterAllocation(0, 0, 1, alloc1, {}); AFL_VERIFY(alloc1->IsAllocated()); auto alloc2 = std::make_shared(1000); - manager->RegisterGroup(0, 2); - manager->RegisterAllocation(0, 2, alloc2, {}); + manager->RegisterGroup(0, 0, 2); + manager->RegisterAllocation(0, 0, 2, alloc2, {}); AFL_VERIFY(!alloc2->IsAllocated()); auto alloc3 = std::make_shared(1000); - manager->RegisterGroup(0, 3); - manager->RegisterAllocation(0, 3, alloc3, {}); + manager->RegisterGroup(0, 0, 3); + manager->RegisterAllocation(0, 0, 3, alloc3, {}); AFL_VERIFY(alloc1->IsAllocated()); AFL_VERIFY(!alloc2->IsAllocated()); AFL_VERIFY(!alloc3->IsAllocated()); auto alloc1_1 = std::make_shared(1000); - manager->RegisterAllocation(0, 1, alloc1_1, {}); + manager->RegisterAllocation(0, 0, 1, alloc1_1, {}); AFL_VERIFY(alloc1_1->IsAllocated()); AFL_VERIFY(!alloc2->IsAllocated()); - manager->UnregisterAllocation(0, alloc1_1->GetIdentifier()); + manager->UnregisterAllocation(0, 0, alloc1_1->GetIdentifier()); AFL_VERIFY(!alloc2->IsAllocated()); - manager->UnregisterGroup(0, 1); + manager->UnregisterGroup(0, 0, 1); AFL_VERIFY(alloc2->IsAllocated()); - manager->UnregisterAllocation(0, alloc1->GetIdentifier()); + manager->UnregisterAllocation(0, 0, alloc1->GetIdentifier()); AFL_VERIFY(!alloc3->IsAllocated()); - manager->UnregisterGroup(0, 2); - manager->UnregisterAllocation(0, alloc2->GetIdentifier()); + manager->UnregisterGroup(0, 0, 2); + manager->UnregisterAllocation(0, 0, alloc2->GetIdentifier()); AFL_VERIFY(alloc3->IsAllocated()); - manager->UnregisterGroup(0, 3); - manager->UnregisterAllocation(0, alloc3->GetIdentifier()); + manager->UnregisterGroup(0, 0, 3); + manager->UnregisterAllocation(0, 0, alloc3->GetIdentifier()); + manager->UnregisterProcessScope(0, 0); manager->UnregisterProcess(0); } AFL_VERIFY(!stage->GetUsage().Val()); @@ -131,44 +135,45 @@ Y_UNIT_TEST_SUITE(GroupedMemoryLimiter) { auto manager = std::make_shared(NActors::TActorId(), config, "test", counters, stage); { manager->RegisterProcess(0, {}); - manager->RegisterGroup(0, 1); + manager->RegisterProcessScope(0, 0); + manager->RegisterGroup(0, 0, 1); auto alloc0 = std::make_shared(1000); - manager->RegisterAllocation(0, 1, alloc0, {}); + manager->RegisterAllocation(0, 0, 1, alloc0, {}); auto alloc1 = std::make_shared(1000); - manager->RegisterAllocation(0, 1, alloc1, {}); + manager->RegisterAllocation(0, 0, 1, alloc1, {}); AFL_VERIFY(alloc0->IsAllocated()); AFL_VERIFY(alloc1->IsAllocated()); - manager->RegisterGroup(0, 2); + manager->RegisterGroup(0, 0, 2); auto alloc2 = std::make_shared(1000); - manager->RegisterAllocation(0, 2, alloc0, {}); - manager->RegisterAllocation(0, 2, alloc2, {}); + manager->RegisterAllocation(0, 0, 2, alloc0, {}); + manager->RegisterAllocation(0, 0, 2, alloc2, {}); AFL_VERIFY(alloc0->IsAllocated()); AFL_VERIFY(!alloc2->IsAllocated()); auto alloc3 = std::make_shared(1000); - manager->RegisterGroup(0, 3); - manager->RegisterAllocation(0, 3, alloc0, {}); - manager->RegisterAllocation(0, 3, alloc3, {}); + manager->RegisterGroup(0, 0, 3); + manager->RegisterAllocation(0, 0, 3, alloc0, {}); + manager->RegisterAllocation(0, 0, 3, alloc3, {}); AFL_VERIFY(alloc0->IsAllocated()); AFL_VERIFY(alloc1->IsAllocated()); AFL_VERIFY(!alloc2->IsAllocated()); AFL_VERIFY(!alloc3->IsAllocated()); - manager->UnregisterGroup(0, 1); - manager->UnregisterAllocation(0, alloc1->GetIdentifier()); + manager->UnregisterGroup(0, 0, 1); + manager->UnregisterAllocation(0, 0, alloc1->GetIdentifier()); AFL_VERIFY(alloc0->IsAllocated()); AFL_VERIFY(alloc2->IsAllocated()); AFL_VERIFY(!alloc3->IsAllocated()); - manager->UnregisterGroup(0, 2); - manager->UnregisterAllocation(0, alloc2->GetIdentifier()); + manager->UnregisterGroup(0, 0, 2); + manager->UnregisterAllocation(0, 0, alloc2->GetIdentifier()); AFL_VERIFY(alloc0->IsAllocated()); AFL_VERIFY(alloc3->IsAllocated()); - manager->UnregisterGroup(0, 3); - manager->UnregisterAllocation(0, alloc3->GetIdentifier()); - manager->UnregisterAllocation(0, alloc0->GetIdentifier()); + manager->UnregisterGroup(0, 0, 3); + manager->UnregisterAllocation(0, 0, alloc3->GetIdentifier()); + manager->UnregisterAllocation(0, 0, alloc0->GetIdentifier()); manager->UnregisterProcess(0); } AFL_VERIFY(!stage->GetUsage().Val()); @@ -190,23 +195,24 @@ Y_UNIT_TEST_SUITE(GroupedMemoryLimiter) { auto manager = std::make_shared(NActors::TActorId(), config, "test", counters, stage); { manager->RegisterProcess(0, {}); + manager->RegisterProcessScope(0, 0); auto alloc1 = std::make_shared(1000); - manager->RegisterGroup(0, 1); - manager->RegisterAllocation(0, 1, alloc1, {}); + manager->RegisterGroup(0, 0, 1); + manager->RegisterAllocation(0, 0, 1, alloc1, {}); AFL_VERIFY(alloc1->IsAllocated()); auto alloc2 = std::make_shared(10); - manager->RegisterGroup(0, 3); - manager->RegisterAllocation(0, 3, alloc2, {}); + manager->RegisterGroup(0, 0, 3); + manager->RegisterAllocation(0, 0, 3, alloc2, {}); AFL_VERIFY(!alloc2->IsAllocated()); - manager->UpdateAllocation(0, alloc1->GetIdentifier(), 10); + manager->UpdateAllocation(0, 0, alloc1->GetIdentifier(), 10); AFL_VERIFY(alloc2->IsAllocated()); - manager->UnregisterGroup(0, 3); - manager->UnregisterAllocation(0, alloc2->GetIdentifier()); + manager->UnregisterGroup(0, 0, 3); + manager->UnregisterAllocation(0, 0, alloc2->GetIdentifier()); - manager->UnregisterGroup(0, 1); - manager->UnregisterAllocation(0, alloc1->GetIdentifier()); + manager->UnregisterGroup(0, 0, 1); + manager->UnregisterAllocation(0, 0, alloc1->GetIdentifier()); manager->UnregisterProcess(0); } AFL_VERIFY(!stage->GetUsage().Val());