Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions ydb/core/kqp/compute_actor/kqp_scan_compute_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class TComputeTaskData;
class TShardScannerInfo {
private:
std::optional<TActorId> ActorId;
const ui64 ScanId;
const ui64 TabletId;
const ui64 Generation;
i64 DataChunksInFlightCount = 0;
Expand Down Expand Up @@ -51,15 +52,16 @@ class TShardScannerInfo {
}
}
public:
TShardScannerInfo(TShardState& state, const IExternalObjectsProvider& externalObjectsProvider)
: TabletId(state.TabletId)
TShardScannerInfo(const ui64 scanId, TShardState& state, const IExternalObjectsProvider& externalObjectsProvider)
: ScanId(scanId)
, TabletId(state.TabletId)
, Generation(++state.Generation)
{
const bool subscribed = std::exchange(state.SubscribedOnTablet, true);

const auto& keyColumnTypes = externalObjectsProvider.GetKeyColumnTypes();
auto ranges = state.GetScanRanges(keyColumnTypes);
auto ev = externalObjectsProvider.BuildEvKqpScan(0, Generation, ranges);
auto ev = externalObjectsProvider.BuildEvKqpScan(ScanId, Generation, ranges);

AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "start_scanner")("tablet_id", TabletId)("generation", Generation)
("info", state.ToString(keyColumnTypes))("range", DebugPrintRanges(keyColumnTypes, ranges, *AppData()->TypeRegistry))
Expand Down Expand Up @@ -250,6 +252,7 @@ class TInFlightShards: public NComputeActor::TScanShardsStatistics {
THashMap<NActors::TActorId, TShardState::TPtr> ShardsByActorId;
bool IsActiveFlag = true;
THashMap<ui64, std::shared_ptr<TShardScannerInfo>> ShardScanners;
const ui64 ScanId;
const IExternalObjectsProvider& ExternalObjectsProvider;
public:

Expand Down Expand Up @@ -313,7 +316,7 @@ class TInFlightShards: public NComputeActor::TScanShardsStatistics {
AFL_ENSURE(state.TabletId);
AFL_ENSURE(!state.ActorId)("actor_id", state.ActorId);
state.State = NComputeActor::EShardState::Starting;
auto newScanner = std::make_shared<TShardScannerInfo>(state, ExternalObjectsProvider);
auto newScanner = std::make_shared<TShardScannerInfo>(ScanId, state, ExternalObjectsProvider);
AFL_ENSURE(ShardScanners.emplace(state.TabletId, newScanner).second);
}

Expand Down Expand Up @@ -356,8 +359,9 @@ class TInFlightShards: public NComputeActor::TScanShardsStatistics {
return nullptr;
}

TInFlightShards(const IExternalObjectsProvider& externalObjectsProvider)
: ExternalObjectsProvider(externalObjectsProvider)
TInFlightShards(const ui64 scanId, const IExternalObjectsProvider& externalObjectsProvider)
: ScanId(scanId)
, ExternalObjectsProvider(externalObjectsProvider)
{
}
bool IsActive() const {
Expand Down
21 changes: 13 additions & 8 deletions ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ TKqpScanFetcherActor::TKqpScanFetcherActor(const NKikimrKqp::TKqpSnapshot& snaps
, Snapshot(snapshot)
, ShardsScanningPolicy(shardsScanningPolicy)
, Counters(counters)
, InFlightShards(*this)
, InFlightShards(ScanId, *this)
, InFlightComputes(ComputeActorIds)
{
Y_UNUSED(traceId);
Expand Down Expand Up @@ -86,7 +86,11 @@ void TKqpScanFetcherActor::Bootstrap() {

void TKqpScanFetcherActor::HandleExecute(TEvScanExchange::TEvAckData::TPtr& ev) {
Y_ABORT_UNLESS(ev->Get()->GetFreeSpace());
ALS_DEBUG(NKikimrServices::KQP_COMPUTE) << "EvAckData (" << SelfId() << "): " << ev->Sender;
AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("event", "AckDataFromCompute")("self_id", SelfId())("scan_id", ScanId)
("packs_to_send", InFlightComputes.GetPacksToSendCount())
("from", ev->Sender)("shards remain", PendingShards.size())
("in flight scans", InFlightShards.GetScansCount())
("in flight shards", InFlightShards.GetShardsCount());
InFlightComputes.OnComputeAck(ev->Sender, ev->Get()->GetFreeSpace());
CheckFinish();
}
Expand Down Expand Up @@ -458,12 +462,13 @@ void TKqpScanFetcherActor::ProcessPendingScanDataItem(TEvKqpCompute::TEvScanData

state->LastKey = std::move(msg.LastKey);
const ui64 rowsCount = msg.GetRowsCount();
CA_LOG_D("action=got EvScanData;rows=" << rowsCount << ";finished=" << msg.Finished << ";exceeded=" << msg.RequestedBytesLimitReached
<< ";from=" << ev->Sender << ";shards remain=" << PendingShards.size()
<< ";in flight scans=" << InFlightShards.GetScansCount()
<< ";in flight shards=" << InFlightShards.GetShardsCount()
<< ";delayed_for=" << latency.SecondsFloat() << " seconds by ratelimiter"
<< ";tablet_id=" << state->TabletId);
AFL_DEBUG(NKikimrServices::KQP_COMPUTE)("action","got EvScanData")("rows", rowsCount)("finished", msg.Finished)("exceeded", msg.RequestedBytesLimitReached)
("scan", ScanId)("packs_to_send", InFlightComputes.GetPacksToSendCount())
("from", ev->Sender)("shards remain", PendingShards.size())
("in flight scans", InFlightShards.GetScansCount())
("in flight shards", InFlightShards.GetShardsCount())
("delayed_for_seconds_by_ratelimiter", latency.SecondsFloat())
("tablet_id", state->TabletId);
auto shardScanner = InFlightShards.GetShardScannerVerified(state->TabletId);
auto tasksForCompute = shardScanner->OnReceiveData(msg, shardScanner);
AFL_ENSURE(tasksForCompute.size() == 1 || tasksForCompute.size() == 0 || tasksForCompute.size() == ComputeActorIds.size())("size", tasksForCompute.size())("compute_size", ComputeActorIds.size());
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/kqp/compute_actor/kqp_scan_fetcher_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,9 @@ class TKqpScanFetcherActor: public NActors::TActorBootstrapped<TKqpScanFetcherAc
std::deque<TShardState> PendingShards;
std::deque<TShardState> PendingResolveShards;

static inline TAtomicCounter ScanIdCounter = 0;
const ui64 ScanId = ScanIdCounter.Inc();

TInFlightShards InFlightShards;
TInFlightComputes InFlightComputes;
ui32 TotalRetries = 0;
Expand Down
25 changes: 18 additions & 7 deletions ydb/core/tx/columnshard/engines/reader/abstract/read_context.h
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
#pragma once
#include "read_metadata.h"

#include <ydb/core/protos/tx_datashard.pb.h>
#include <ydb/core/tx/columnshard/counters/scan.h>
#include <ydb/core/tx/columnshard/resource_subscriber/task.h>
#include <ydb/core/tx/columnshard/blobs_action/abstract/storages_manager.h>
#include <ydb/core/tx/columnshard/counters/scan.h>
#include <ydb/core/tx/columnshard/engines/reader/common/result.h>
#include <ydb/core/tx/columnshard/resource_subscriber/task.h>

#include <ydb/library/accessor/accessor.h>

namespace NKikimr::NOlap::NReader {
Expand All @@ -13,6 +15,7 @@ class TComputeShardingPolicy {
private:
YDB_READONLY(ui32, ShardsCount, 0);
YDB_READONLY_DEF(std::vector<std::string>, ColumnNames);

public:
TString DebugString() const {
return TStringBuilder() << "shards_count:" << ShardsCount << ";columns=" << JoinSeq(",", ColumnNames) << ";";
Expand Down Expand Up @@ -42,10 +45,12 @@ class TReadContext {
const NColumnShard::TConcreteScanCounters Counters;
TReadMetadataBase::TConstPtr ReadMetadata;
NResourceBroker::NSubscribe::TTaskContext ResourcesTaskContext;
const ui64 ScanId;
const TActorId ScanActorId;
const TActorId ResourceSubscribeActorId;
const TActorId ReadCoordinatorActorId;
const TComputeShardingPolicy ComputeShardingPolicy;

public:
template <class T>
std::shared_ptr<const T> GetReadMetadataPtrVerifiedAs() const {
Expand Down Expand Up @@ -74,6 +79,10 @@ class TReadContext {
return ScanActorId;
}

ui64 GetScanId() const {
return ScanId;
}

const TReadMetadataBase::TConstPtr& GetReadMetadata() const {
return ReadMetadata;
}
Expand All @@ -86,17 +95,18 @@ class TReadContext {
return ResourcesTaskContext;
}

TReadContext(const std::shared_ptr<IStoragesManager>& storagesManager, const NColumnShard::TConcreteScanCounters& counters, const TReadMetadataBase::TConstPtr& readMetadata,
const TActorId& scanActorId, const TActorId& resourceSubscribeActorId, const TActorId& readCoordinatorActorId, const TComputeShardingPolicy& computeShardingPolicy)
TReadContext(const std::shared_ptr<IStoragesManager>& storagesManager, const NColumnShard::TConcreteScanCounters& counters,
const TReadMetadataBase::TConstPtr& readMetadata, const TActorId& scanActorId, const TActorId& resourceSubscribeActorId,
const TActorId& readCoordinatorActorId, const TComputeShardingPolicy& computeShardingPolicy, const ui64 scanId)
: StoragesManager(storagesManager)
, Counters(counters)
, ReadMetadata(readMetadata)
, ResourcesTaskContext("CS::SCAN_READ", counters.ResourcesSubscriberCounters)
, ScanId(scanId)
, ScanActorId(scanActorId)
, ResourceSubscribeActorId(resourceSubscribeActorId)
, ReadCoordinatorActorId(readCoordinatorActorId)
, ComputeShardingPolicy(computeShardingPolicy)
{
, ComputeShardingPolicy(computeShardingPolicy) {
Y_ABORT_UNLESS(ReadMetadata);
}
};
Expand All @@ -111,6 +121,7 @@ class IDataReader {
virtual bool DoIsFinished() const = 0;
virtual std::vector<std::shared_ptr<TPartialReadResult>> DoExtractReadyResults(const int64_t maxRowsInBatch) = 0;
virtual TConclusion<bool> DoReadNextInterval() = 0;

public:
IDataReader(const std::shared_ptr<TReadContext>& context);
virtual ~IDataReader() = default;
Expand Down Expand Up @@ -171,4 +182,4 @@ class IDataReader {
}
};

}
} // namespace NKikimr::NOlap::NReader
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/engines/reader/actor/actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ void TColumnShardScan::Bootstrap(const TActorContext& ctx) {
ReadCoordinatorActorId = ctx.Register(new NBlobOperations::NRead::TReadCoordinatorActor(TabletId, SelfId()));

std::shared_ptr<TReadContext> context = std::make_shared<TReadContext>(StoragesManager, ScanCountersPool,
ReadMetadataRange, SelfId(), ResourceSubscribeActorId, ReadCoordinatorActorId, ComputeShardingPolicy);
ReadMetadataRange, SelfId(), ResourceSubscribeActorId, ReadCoordinatorActorId, ComputeShardingPolicy, ScanId);
ScanIterator = ReadMetadataRange->StartScan(context);
auto startResult = ScanIterator->Start();
StartInstant = TMonotonic::Now();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,8 @@ TSpecialReadContext::TSpecialReadContext(const std::shared_ptr<TReadContext>& co
};
ProcessMemoryGuard =
NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildProcessGuard(CommonContext->GetReadMetadata()->GetTxId(), stages);
ProcessScopeGuard =
NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildScopeGuard(CommonContext->GetReadMetadata()->GetTxId(), GetCommonContext()->GetScanId());

auto readSchema = ReadMetadata->GetResultSchema();
SpecColumns = std::make_shared<TColumnsSet>(TIndexInfo::GetSnapshotColumnIdsSet(), readSchema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class TSpecialReadContext {
private:
YDB_READONLY_DEF(std::shared_ptr<TReadContext>, CommonContext);
YDB_READONLY_DEF(std::shared_ptr<NGroupedMemoryManager::TProcessGuard>, ProcessMemoryGuard);
YDB_READONLY_DEF(std::shared_ptr<NGroupedMemoryManager::TScopeGuard>, ProcessScopeGuard);

YDB_READONLY_DEF(std::shared_ptr<TColumnsSet>, SpecColumns);
YDB_READONLY_DEF(std::shared_ptr<TColumnsSet>, MergeColumns);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,8 @@ TConclusion<bool> TAllocateMemoryStep::DoExecuteInplace(
const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step) const {

auto allocation = std::make_shared<TFetchingStepAllocation>(source, GetProcessingDataSize(source), step);
NGroupedMemoryManager::TScanMemoryLimiterOperator::SendToAllocation(
source->GetContext()->GetProcessMemoryControlId(), source->GetFirstIntervalId(), { allocation }, (ui32)StageIndex);
NGroupedMemoryManager::TScanMemoryLimiterOperator::SendToAllocation(source->GetContext()->GetProcessMemoryControlId(),
source->GetContext()->GetCommonContext()->GetScanId(), source->GetFirstIntervalId(), { allocation }, (ui32)StageIndex);
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ void TFetchingInterval::ConstructResult() {

auto task = std::make_shared<TStartMergeTask>(MergingContext, Context, std::move(Sources));
task->SetPriority(NConveyor::ITask::EPriority::High);
NGroupedMemoryManager::TScanMemoryLimiterOperator::SendToAllocation(
Context->GetProcessMemoryControlId(), GetIntervalId(), { task }, (ui32)EStageFeaturesIndexes::Merge);
NGroupedMemoryManager::TScanMemoryLimiterOperator::SendToAllocation(Context->GetProcessMemoryControlId(),
Context->GetCommonContext()->GetScanId(), GetIntervalId(), { task }, (ui32)EStageFeaturesIndexes::Merge);
}
}

Expand All @@ -41,14 +41,16 @@ TFetchingInterval::TFetchingInterval(const NArrow::NMerger::TSortableBatchPositi
, TaskGuard(Context->GetCommonContext()->GetCounters().GetResourcesAllocationTasksGuard())
, Sources(sources)
, IntervalIdx(intervalIdx)
, IntervalGroupGuard(NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildGroupGuard(Context->GetProcessMemoryControlId()))
, IntervalGroupGuard(NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildGroupGuard(
Context->GetProcessMemoryControlId(), context->GetCommonContext()->GetScanId()))
, IntervalStateGuard(Context->GetCommonContext()->GetCounters().CreateIntervalStateGuard()) {
AFL_VERIFY(Sources.size());
for (auto&& [_, i] : Sources) {
if (!i->IsDataReady()) {
++WaitSourcesCount;
} else {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "ready_source")("interval_idx", IntervalIdx)("interval_id", GetIntervalId());
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "ready_source")("interval_idx", IntervalIdx)(
"interval_id", GetIntervalId());
}
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "register_source")("interval_idx", IntervalIdx)("interval_id", GetIntervalId());
i->RegisterInterval(*this, i);
Expand Down Expand Up @@ -81,8 +83,8 @@ void TFetchingInterval::OnPartSendingComplete() {

auto task = std::make_shared<TContinueMergeTask>(MergingContext, Context, std::move(Merger));
task->SetPriority(NConveyor::ITask::EPriority::High);
NGroupedMemoryManager::TScanMemoryLimiterOperator::SendToAllocation(
Context->GetProcessMemoryControlId(), GetIntervalId(), { task }, (ui32)EStageFeaturesIndexes::Merge);
NGroupedMemoryManager::TScanMemoryLimiterOperator::SendToAllocation(Context->GetProcessMemoryControlId(),
Context->GetCommonContext()->GetScanId(), GetIntervalId(), { task }, (ui32)EStageFeaturesIndexes::Merge);
}

} // namespace NKikimr::NOlap::NReader::NPlain
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ void TScanHead::OnIntervalResult(std::shared_ptr<NGroupedMemoryManager::TAllocat
auto it = ReadyIntervals.find(intervalIdx);
if (it == ReadyIntervals.end()) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "interval_result_absent")("interval_idx", intervalIdx)(
"merger", interval->HasMerger())("intervalId", interval->GetIntervalId());
"merger", interval->HasMerger())("interval_id", interval->GetIntervalId());
break;
} else {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "interval_result")("interval_idx", intervalIdx)("count",
it->second ? it->second->GetRecordsCount() : 0)("merger", interval->HasMerger())("intervalId", interval->GetIntervalId());
it->second ? it->second->GetRecordsCount() : 0)("merger", interval->HasMerger())("interval_id", interval->GetIntervalId());
}
auto result = it->second;
ReadyIntervals.erase(it);
Expand Down
20 changes: 15 additions & 5 deletions ydb/core/tx/limiter/grouped_memory/service/actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,26 @@ void TMemoryLimiterActor::Bootstrap() {

void TMemoryLimiterActor::Handle(NEvents::TEvExternal::TEvStartTask::TPtr& ev) {
for (auto&& i : ev->Get()->GetAllocations()) {
Manager->RegisterAllocation(ev->Get()->GetExternalProcessId(), ev->Get()->GetExternalGroupId(), i, ev->Get()->GetStageFeaturesIdx());
Manager->RegisterAllocation(ev->Get()->GetExternalProcessId(), ev->Get()->GetExternalScopeId(), ev->Get()->GetExternalGroupId(), i,
ev->Get()->GetStageFeaturesIdx());
}
}

void TMemoryLimiterActor::Handle(NEvents::TEvExternal::TEvFinishTask::TPtr& ev) {
Manager->UnregisterAllocation(ev->Get()->GetExternalProcessId(), ev->Get()->GetAllocationId());
Manager->UnregisterAllocation(ev->Get()->GetExternalProcessId(), ev->Get()->GetExternalScopeId(), ev->Get()->GetAllocationId());
}

void TMemoryLimiterActor::Handle(NEvents::TEvExternal::TEvUpdateTask::TPtr& ev) {
Manager->UpdateAllocation(ev->Get()->GetExternalProcessId(), ev->Get()->GetAllocationId(), ev->Get()->GetVolume());
Manager->UpdateAllocation(
ev->Get()->GetExternalProcessId(), ev->Get()->GetExternalScopeId(), ev->Get()->GetAllocationId(), ev->Get()->GetVolume());
}

void TMemoryLimiterActor::Handle(NEvents::TEvExternal::TEvFinishGroup::TPtr& ev) {
Manager->UnregisterGroup(ev->Get()->GetExternalProcessId(), ev->Get()->GetExternalGroupId());
Manager->UnregisterGroup(ev->Get()->GetExternalProcessId(), ev->Get()->GetExternalScopeId(), ev->Get()->GetExternalGroupId());
}

void TMemoryLimiterActor::Handle(NEvents::TEvExternal::TEvStartGroup::TPtr& ev) {
Manager->RegisterGroup(ev->Get()->GetExternalProcessId(), ev->Get()->GetExternalGroupId());
Manager->RegisterGroup(ev->Get()->GetExternalProcessId(), ev->Get()->GetExternalScopeId(), ev->Get()->GetExternalGroupId());
}

void TMemoryLimiterActor::Handle(NEvents::TEvExternal::TEvFinishProcess::TPtr& ev) {
Expand All @@ -37,4 +39,12 @@ void TMemoryLimiterActor::Handle(NEvents::TEvExternal::TEvStartProcess::TPtr& ev
Manager->RegisterProcess(ev->Get()->GetExternalProcessId(), ev->Get()->GetStages());
}

void TMemoryLimiterActor::Handle(NEvents::TEvExternal::TEvFinishProcessScope::TPtr& ev) {
Manager->UnregisterProcessScope(ev->Get()->GetExternalProcessId(), ev->Get()->GetExternalScopeId());
}

void TMemoryLimiterActor::Handle(NEvents::TEvExternal::TEvStartProcessScope::TPtr& ev) {
Manager->RegisterProcessScope(ev->Get()->GetExternalProcessId(), ev->Get()->GetExternalScopeId());
}

} // namespace NKikimr::NOlap::NGroupedMemoryManager
4 changes: 4 additions & 0 deletions ydb/core/tx/limiter/grouped_memory/service/actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ class TMemoryLimiterActor: public NActors::TActorBootstrapped<TMemoryLimiterActo
void Handle(NEvents::TEvExternal::TEvFinishGroup::TPtr& ev);
void Handle(NEvents::TEvExternal::TEvStartProcess::TPtr& ev);
void Handle(NEvents::TEvExternal::TEvFinishProcess::TPtr& ev);
void Handle(NEvents::TEvExternal::TEvStartProcessScope::TPtr& ev);
void Handle(NEvents::TEvExternal::TEvFinishProcessScope::TPtr& ev);

void Bootstrap();

Expand All @@ -46,6 +48,8 @@ class TMemoryLimiterActor: public NActors::TActorBootstrapped<TMemoryLimiterActo
hFunc(NEvents::TEvExternal::TEvFinishGroup, Handle);
hFunc(NEvents::TEvExternal::TEvStartProcess, Handle);
hFunc(NEvents::TEvExternal::TEvFinishProcess, Handle);
hFunc(NEvents::TEvExternal::TEvStartProcessScope, Handle);
hFunc(NEvents::TEvExternal::TEvFinishProcessScope, Handle);
default:
AFL_VERIFY(false)("ev_type", ev->GetTypeName());
}
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/tx/limiter/grouped_memory/service/allocation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@

namespace NKikimr::NOlap::NGroupedMemoryManager {

TAllocationInfo::TAllocationInfo(const ui64 processId, const ui64 allocationInternalGroupId, const std::shared_ptr<IAllocation>& allocation,
TAllocationInfo::TAllocationInfo(const ui64 processId, const ui64 scopeId, const ui64 allocationInternalGroupId,
const std::shared_ptr<IAllocation>& allocation,
const std::shared_ptr<TStageFeatures>& stage)
: Allocation(allocation)
, AllocationInternalGroupId(allocationInternalGroupId)
, Identifier(TValidator::CheckNotNull(Allocation)->GetIdentifier())
, ProcessId(processId)
, ScopeId(scopeId)
, Stage(stage) {
AFL_VERIFY(Stage);
AFL_VERIFY(Allocation);
Expand Down
Loading