Skip to content

Commit

Permalink
wip.1206.1
Browse files Browse the repository at this point in the history
  • Loading branch information
lgbo-ustc committed Dec 6, 2023
1 parent 491d199 commit ba75b5d
Show file tree
Hide file tree
Showing 10 changed files with 261 additions and 775 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,12 @@ class HashAggregateMetricsUpdater(val metrics: Map[String, SQLMetric])
object HashAggregateMetricsUpdater {
val INCLUDING_PROCESSORS = Array(
"AggregatingTransform",
"StreamingAggregatingTransform",
"MergingAggregatedTransform",
"ScalableMergingAggregatedTransform")
"GraceMergingAggregatedTransform")
val CH_PLAN_NODE_NAME = Array(
"AggregatingTransform",
"StreamingAggregatingTransform",
"MergingAggregatedTransform",
"ScalableMergingAggregatedTransform")
"GraceMergingAggregatedTransform")
}
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ class GlutenClickHouseTPCHMetricsSuite extends GlutenClickHouseTPCHAbstractSuite
.getSteps
.get(0)
.getName
.equals("ScalableMergingAggregated"))
.equals("GraceMergingAggregatedTransform"))
assert(
nativeMetricsDataFinal.metricsDataList.get(1).getSteps.get(1).getName.equals("Expression"))
assert(nativeMetricsDataFinal.metricsDataList.get(2).getName.equals("kProject"))
Expand Down
51 changes: 46 additions & 5 deletions cpp-ch/local-engine/Operator/GraceMergingAggregatedStep.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,17 @@ void GraceMergingAggregatedTransform::extendBuckets()
void GraceMergingAggregatedTransform::rehashDataVariants()
{
auto blocks = params->aggregator.convertToBlocks(*current_data_variants, false, 1);

size_t block_rows = 0;
size_t block_memory_usage = 0;
for (const auto & block : blocks)
{
block_rows += block.rows();
block_memory_usage += block.allocatedBytes();
}
if (block_rows)
per_key_memory_usage = block_memory_usage * 1.0 / block_rows;

current_data_variants = std::make_shared<DB::AggregatedDataVariants>();
no_more_keys = false;
for (auto & block : blocks)
Expand Down Expand Up @@ -401,12 +412,42 @@ bool GraceMergingAggregatedTransform::isMemoryOverFlow()
auto mem_limit = context->getSettingsRef().max_bytes_before_external_group_by;
if (!mem_limit)
return false;
auto current_memory_usage = getMemoryUsage();
if (current_memory_usage > mem_limit)
auto max_mem_limit = context->getSettingsRef().max_memory_usage * 8 / 10;
if (per_key_memory_usage <= 0 || !max_mem_limit)
{
LOG_INFO(logger, "memory is overflow. used: {}, limit: {}", ReadableSize(current_memory_usage), ReadableSize(mem_limit));
return true;
if (getMemoryUsage() > mem_limit)
{
LOG_INFO(
logger,
"memory is overflow. this: {}, used: {}, limit: {}/{}",
fmt::ptr(this),
ReadableSize(getMemoryUsage()),
ReadableSize(mem_limit),
ReadableSize(max_mem_limit));
return true;
}
return false;
}
else
{
auto current_mem_used = getMemoryUsage();
// max usage is 80% of max_memory_usage
if (current_mem_used > max_mem_limit)
return true;
auto current_result_rows = current_data_variants->size();
if (current_mem_used + per_key_memory_usage * current_result_rows > max_mem_limit)
{
LOG_INFO(
logger,
"memory is overflow. this: {}, current_mem_used: {}, per_key_memory_usage: {}, current_result_rows: {}, max_mem_used: {}",
fmt::ptr(this),
ReadableSize(current_mem_used),
ReadableSize(per_key_memory_usage),
current_result_rows,
ReadableSize(max_mem_limit));
return true;
}
return false;
}
return false;
}
}
2 changes: 2 additions & 0 deletions cpp-ch/local-engine/Operator/GraceMergingAggregatedStep.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ class GraceMergingAggregatedTransform : public DB::IProcessor
DB::BlocksList current_final_blocks;
bool no_more_keys = false;

double per_key_memory_usage = 0;

// metrics
size_t total_input_blocks = 0;
size_t total_input_rows = 0;
Expand Down
Loading

0 comments on commit ba75b5d

Please sign in to comment.