diff --git a/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp b/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp index 631b380e320a..2b47f9b8a736 100644 --- a/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp +++ b/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp @@ -275,4 +275,30 @@ std::shared_ptr TGeneralCo } } +ui64 TGeneralCompactColumnEngineChanges::TMemoryPredictorChunkedPolicy::AddPortion(const TPortionInfo& portionInfo) { + SumMemoryFix += portionInfo.GetRecordsCount() * (2 * sizeof(ui64) + sizeof(ui32) + sizeof(ui16)); + ++PortionsCount; + THashMap maxChunkSizeByColumn; + for (auto&& i : portionInfo.GetRecords()) { + SumMemoryFix += i.BlobRange.Size; + auto it = maxChunkSizeByColumn.find(i.GetColumnId()); + if (it == maxChunkSizeByColumn.end()) { + maxChunkSizeByColumn.emplace(i.GetColumnId(), i.GetMeta().GetRawBytesVerified()); + } else { + if (it->second < i.GetMeta().GetRawBytesVerified()) { + it->second = i.GetMeta().GetRawBytesVerified(); + } + } + } + + SumMemoryDelta = 0; + for (auto&& i : maxChunkSizeByColumn) { + MaxMemoryByColumnChunk[i.first] += i.second; + SumMemoryDelta = std::max(SumMemoryDelta, MaxMemoryByColumnChunk[i.first]); + } + + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("memory_prediction_after", SumMemoryFix + SumMemoryDelta)("portion_info", portionInfo.DebugString()); + return SumMemoryFix + SumMemoryDelta; +} + } diff --git a/ydb/core/tx/columnshard/engines/changes/general_compaction.h b/ydb/core/tx/columnshard/engines/changes/general_compaction.h index 506966ac3494..287ba34b37ae 100644 --- a/ydb/core/tx/columnshard/engines/changes/general_compaction.h +++ b/ydb/core/tx/columnshard/engines/changes/general_compaction.h @@ -50,27 +50,12 @@ class TGeneralCompactColumnEngineChanges: public TCompactColumnEngineChanges { class TMemoryPredictorChunkedPolicy: public IMemoryPredictor { private: - ui64 SumMemory = 0; + ui64 SumMemoryDelta = 0; + ui64 SumMemoryFix = 0; ui32 PortionsCount = 0; THashMap MaxMemoryByColumnChunk; public: - virtual ui64 AddPortion(const TPortionInfo& portionInfo) override { - SumMemory += portionInfo.GetRecordsCount() * (2 * sizeof(ui64) + sizeof(ui32) + sizeof(ui16)); - for (auto&& i : portionInfo.GetRecords()) { - SumMemory += i.BlobRange.Size; - auto it = MaxMemoryByColumnChunk.find(i.GetColumnId()); - ++PortionsCount; - if (it == MaxMemoryByColumnChunk.end()) { - it = MaxMemoryByColumnChunk.emplace(i.GetColumnId(), i.GetMeta().GetRawBytesVerified()).first; - SumMemory += it->second * PortionsCount; - } else if (it->second < i.GetMeta().GetRawBytesVerified()) { - SumMemory -= it->second * (PortionsCount - 1); - it->second = i.GetMeta().GetRawBytesVerified(); - SumMemory += it->second * PortionsCount; - } - } - return SumMemory; - } + virtual ui64 AddPortion(const TPortionInfo& portionInfo) override; }; static std::shared_ptr BuildMemoryPredictor();