Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
lgbo-ustc committed Nov 24, 2023
1 parent df4d572 commit 83c1af3
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 166 deletions.
4 changes: 0 additions & 4 deletions cpp-ch/local-engine/Common/QueryContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,10 @@ int64_t initializeQuery(ReservationListenerWrapperPtr listener)
auto allocator_id = reinterpret_cast<int64_t>(allocator_context.get());
CurrentMemoryTracker::before_alloc = [listener](Int64 size, bool throw_if_memory_exceed) -> void
{
#if 0
if (throw_if_memory_exceed)
listener->reserveOrThrow(size);
else
listener->reserve(size);
#else
listener->reserve(size);
#endif
};
CurrentMemoryTracker::before_free = [listener](Int64 size) -> void { listener->free(size); };
allocator_map.insert(allocator_id, allocator_context);
Expand Down
104 changes: 49 additions & 55 deletions cpp-ch/local-engine/Operator/ScalableMergingAggregatedStep.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include <iterator>
#include <type_traits>
#include <typeinfo>
#include <Operator/ScalableMergingAggregatedStep.h>
#include <Processors/Transforms/AggregatingTransform.h>
Expand Down Expand Up @@ -86,34 +87,30 @@ ScalableMergingAggregatedTransform::ScalableMergingAggregatedTransform(
, params(params_)
, context(context_)
, tmp_data_disk(std::make_unique<DB::TemporaryDataOnDisk>(context_->getTempDataOnDisk()))
{}
{
buckets_data_variants.resize(max_bucket_number + 1, nullptr);
bucket_tmp_files.resize(max_bucket_number + 1, nullptr);
}

bool ScalableMergingAggregatedTransform::isMemoryOverFlow()
{
#if 1
UInt64 current_memory_usage = getMemoryUsage();
if (params->params.max_bytes_before_external_group_by && current_memory_usage > params->params.max_bytes_before_external_group_by)
{
LOG_ERROR(
LOG_INFO(
logger,
"xxx Memory is overflow. current_memory_usage: {}, max_bytes_before_external_group_by: {}",
"Memory is overflow. current_memory_usage: {}, max_bytes_before_external_group_by: {}",
ReadableSize(current_memory_usage),
ReadableSize(params->params.max_bytes_before_external_group_by));
return true;
}
return false;
#else
if (has_single_level || buckets_data_variants.size() > 1)
{
// LOG_ERROR(logger, "xxx memory overflow. has_single_level: {}, buckets_data_variants.size(): {}", has_single_level, buckets_data_variants.size());
return true;
}
return false;
#endif
}

void ScalableMergingAggregatedTransform::swithMode()
{
if (params->params.keys.empty())
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Cannot switch mode for aggregation without keys");
has_two_level = true;
mode = OVERFLOW;
}
Expand All @@ -122,7 +119,6 @@ size_t ScalableMergingAggregatedTransform::spillBucketDataToDisk(Int32 bucket, D
{
if (!block.rows())
return 0;
// LOG_ERROR(logger, "xxx spillBucketDataToDisk. bucket: {}, block: {}", bucket, block.dumpStructure());
auto * tmp_file = getBucketTempFile(bucket);
block.info.bucket_num = bucket;
return tmp_file->write(block);
Expand Down Expand Up @@ -181,7 +177,6 @@ DB::IProcessor::Status ScalableMergingAggregatedTransform::prepare()
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Unknow chunk info type.");
}

// Some cases that we cannot support at present.
has_input = true;
}
return Status::Ready;
Expand All @@ -200,7 +195,6 @@ DB::IProcessor::Status ScalableMergingAggregatedTransform::prepare()

void ScalableMergingAggregatedTransform::work()
{
// LOG_ERROR(&Poco::Logger::get("ScalableMergingAggregatedTransform"), "xxx work.");
if (has_input && isMemoryOverFlow())
{
swithMode();
Expand All @@ -214,20 +208,16 @@ void ScalableMergingAggregatedTransform::workImpl()
{
for (; current_bucket_num < max_bucket_number; ++current_bucket_num)
{
auto it = buckets_data_variants.find(current_bucket_num);
if (it != buckets_data_variants.end() && it->second)
auto bucket_data_variants = getBucketDataVariants(current_bucket_num);
if (bucket_data_variants)
{
auto block = params->aggregator.convertToSingleBlock(*it->second, true);
if (block)
LOG_ERROR(logger, "xxx load bucket from mem. bucket: {}, block: {}", current_bucket_num, block.dumpStructure());
it->second = nullptr;
auto block = params->aggregator.convertToSingleBlock(*bucket_data_variants, true);
releaseBucketDataVariants(current_bucket_num);
output_chunk = blockToChunk(block);
has_output = true;
return;
}
auto block = loadBucketDataFromDisk(current_bucket_num);
if (block)
LOG_ERROR(logger, "xxx load bucket from disk. bucket: {}, block: {}", current_bucket_num, block.dumpStructure());
auto block = loadBucketDataFromDiskAndMerge(current_bucket_num);
if (block)
{
output_chunk = blockToChunk(block);
Expand All @@ -245,12 +235,10 @@ void ScalableMergingAggregatedTransform::workImpl()
auto bucket_data_variants = getBucketDataVariants(bucket_num, create_on_miss);
if (bucket_data_variants)
{
LOG_ERROR(logger, "xxx add block into aggregator. bucket: {}, block: {}", bucket_num, block.dumpStructure());
params->aggregator.mergeOnBlock(block, *bucket_data_variants, no_more_keys);
}
else
{
LOG_ERROR(logger, "xxx add block into disk. bucket: {}, block: {}", bucket_num, block.dumpStructure());
spillBucketDataToDisk(bucket_num, block);
}
};
Expand All @@ -260,7 +248,6 @@ void ScalableMergingAggregatedTransform::workImpl()
auto bucket_data_variants = getBucketDataVariants(-1, false);
if (bucket_data_variants)
{
// LOG_ERROR(logger, "xxx split bucket -1");
auto blocks_list = params->aggregator.convertToBlocks(*bucket_data_variants, false, 1);
if (blocks_list.size() > 1)
{
Expand All @@ -275,7 +262,7 @@ void ScalableMergingAggregatedTransform::workImpl()
for (auto & blk : blocks_vector)
add_block(blk, blk.info.bucket_num, true);
}
buckets_data_variants.erase(-1);
releaseBucketDataVariants(-1);
}
has_single_level = false;
}
Expand All @@ -287,52 +274,55 @@ void ScalableMergingAggregatedTransform::workImpl()

/// If we have at least one two level block, transform all single level blocks into two level blocks.
const auto * agg_info = typeid_cast<const DB::AggregatedChunkInfo *>(input_chunk.getChunkInfo().get());
// LOG_ERROR(logger, "xxx input one chunk, bucket: {}, has_single_level: {}, has_two_level: {}, OOM: {}", agg_info->bucket_num, has_single_level, has_two_level, mode);
if (agg_info->bucket_num == -1 && (has_two_level || mode == OVERFLOW))
{
auto block = chunkToBlock(input_chunk);
auto block_struct = block.dumpStructure();
auto blocks_vector = params->aggregator.convertBlockToTwoLevel(block);
// LOG_ERROR(logger, "xxx split bucket -1 into {} buckets. block: {}", blocks_vector.size(), block_struct);
for (auto & blk : blocks_vector)
{
// LOG_ERROR(logger, "xxx bucket {} from -1. block: {}", blk.info.bucket_num, blk.dumpStructure());
add_block(blk, blk.info.bucket_num, false);
add_block(blk, blk.info.bucket_num, mode != OVERFLOW);
}
has_single_level = false;
}
else
{
auto block = chunkToBlock(input_chunk);
add_block(block, agg_info->bucket_num, false);
add_block(block, agg_info->bucket_num, mode != OVERFLOW);
}
input_chunk = {};
has_input = false;
}
}
DB::AggregatedDataVariantsPtr ScalableMergingAggregatedTransform::getBucketDataVariants(Int32 bucket, bool create_on_miss, size_t size_hint [[maybe_unused]])
{
auto it = buckets_data_variants.find(bucket);
if (it != buckets_data_variants.end() && it->second)
return it->second;
if (!create_on_miss)
return nullptr;
auto result = params->aggregator.buildAggregatedDataVariants();
buckets_data_variants[bucket] = result;
return result;
UInt32 index = static_cast<UInt32>(1 + bucket);
if (!buckets_data_variants[index] && create_on_miss)
{
in_memory_buckets_num += 1;
buckets_data_variants[index] = std::make_shared<DB::AggregatedDataVariants>();
}
return buckets_data_variants[index];
}

DB::Block ScalableMergingAggregatedTransform::loadBucketDataFromDisk(Int32 bucket)
void ScalableMergingAggregatedTransform::releaseBucketDataVariants(Int32 bucket)
{
auto it = bucket_tmp_files.find(bucket);
if (it == bucket_tmp_files.end())
UInt32 index = static_cast<UInt32>(1 + bucket);
buckets_data_variants[index] = nullptr;
in_memory_buckets_num -= 1;
}

DB::Block ScalableMergingAggregatedTransform::loadBucketDataFromDiskAndMerge(Int32 bucket)
{
UInt32 index = static_cast<UInt32>(1 + bucket);
if (!bucket_tmp_files[index])
return {};
it->second->finishWriting();
auto data_variant = params->aggregator.buildAggregatedDataVariants();
bucket_tmp_files[index]->finishWriting();
auto data_variant = std::make_shared<DB::AggregatedDataVariants>();
bool has_data = false;
while(true)
{
auto block = it->second->read();
auto block = bucket_tmp_files[index]->read();
if (!block)
break;
has_data = true;
Expand Down Expand Up @@ -378,16 +368,20 @@ Int64 ScalableMergingAggregatedTransform::getMemoryUsage()

void ScalableMergingAggregatedTransform::spillOneBucket()
{
if (buckets_data_variants.size() <= 1)

if (in_memory_buckets_num <= 1)
{
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Cannot spill one bucket.");
}
auto it = buckets_data_variants.rbegin();
auto block = params->aggregator.convertToSingleBlock(*it->second, false);
// LOG_ERROR(logger, "xxx bucket: {}, spill output header: {}", it->first, block.dumpStructure());
auto write_bytes = spillBucketDataToDisk(it->first, block);
it->second = nullptr;
buckets_data_variants.erase(it->first);
// LOG_ERROR(logger, "Spill bucket {} to disk. write_bytes: {}. block bytes: {}", it->first, write_bytes, block.allocatedBytes());
for (size_t i = max_bucket_number; i > 0; --i)
{
if (buckets_data_variants[i])
{
auto block = params->aggregator.convertToSingleBlock(*buckets_data_variants[i], false);
auto write_bytes = spillBucketDataToDisk(i - 1, block);
releaseBucketDataVariants(i - 1);
break;
}
}
}
}
22 changes: 12 additions & 10 deletions cpp-ch/local-engine/Operator/ScalableMergingAggregatedStep.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
namespace local_engine
{

/// A more memory efficient implementation for merging aggregated date into final blocks.
/// If the memory consumption exceeds the limit, we will spilt data into 256 buckets, and spill
/// some of them into disk.
class ScalableMergingAggregatedStep : public DB::ITransformingStep
{
public:
Expand Down Expand Up @@ -68,13 +71,12 @@ class ScalableMergingAggregatedTransform : public DB::IProcessor
Mode mode = NORMAL;
bool has_single_level = false;
bool has_two_level = false;

/// This only used in NORMAL mode
std::map<Int32, DB::AggregatedDataVariantsPtr> buckets_data_variants;
size_t in_memory_buckets_num = 0;
std::vector<DB::AggregatedDataVariantsPtr> buckets_data_variants;

Int32 current_bucket_num = -1;
DB::TemporaryDataOnDiskPtr tmp_data_disk;
std::map<Int32, DB::TemporaryFileStream *> bucket_tmp_files;
std::vector<DB::TemporaryFileStream *> bucket_tmp_files;

bool no_more_keys = false;

Expand All @@ -84,17 +86,17 @@ class ScalableMergingAggregatedTransform : public DB::IProcessor

inline DB::TemporaryFileStream * getBucketTempFile(Int32 bucket)
{
auto it = bucket_tmp_files.find(bucket);
if (it == bucket_tmp_files.end())
UInt32 index = static_cast<UInt32>(bucket + 1);
if (!bucket_tmp_files[index])
{
auto * tmp_file = &tmp_data_disk->createStream(header);
bucket_tmp_files[bucket] = tmp_file;
return tmp_file;
bucket_tmp_files[index] = tmp_file;
}
return it->second;
return bucket_tmp_files[index];
}

DB::AggregatedDataVariantsPtr getBucketDataVariants(Int32 bucket, bool create_on_miss = false, size_t size_hint = 0);
void releaseBucketDataVariants(Int32 bucket);

bool input_finished = false;
DB::Chunk input_chunk;
Expand All @@ -106,7 +108,7 @@ class ScalableMergingAggregatedTransform : public DB::IProcessor
// Append one block to the tmp file.
size_t spillBucketDataToDisk(Int32 bucket, DB::Block block);
/// Load all blocks from the tmp file and merge them.
DB::Block loadBucketDataFromDisk(Int32 bucket);
DB::Block loadBucketDataFromDiskAndMerge(Int32 bucket);

DB::Block chunkToBlock(DB::Chunk & chunk);
DB::Chunk blockToChunk(DB::Block & block);
Expand Down
52 changes: 23 additions & 29 deletions cpp-ch/local-engine/Parser/AggregateRelParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -234,39 +234,33 @@ void AggregateRelParser::buildAggregateDescriptions(AggregateDescriptions & desc

void AggregateRelParser::addMergingAggregatedStep()
{
#if 0
AggregateDescriptions aggregate_descriptions;
buildAggregateDescriptions(aggregate_descriptions);
auto settings = getContext()->getSettingsRef();
Aggregator::Params params(grouping_keys, aggregate_descriptions, false, settings.max_threads, settings.max_block_size);
auto merging_step = std::make_unique<DB::MergingAggregatedStep>(
plan->getCurrentDataStream(),
params,
true,
true,
1,
1,
false,
settings.max_block_size,
settings.aggregation_in_order_max_block_bytes,
SortDescription(),
settings.enable_memory_bound_merging_of_aggregation_results);
steps.emplace_back(merging_step.get());
plan->addStep(std::move(merging_step));
#else
AggregateDescriptions aggregate_descriptions;
buildAggregateDescriptions(aggregate_descriptions);
auto settings = getContext()->getSettingsRef();
Aggregator::Params params(grouping_keys, aggregate_descriptions, false, settings.max_threads, settings.max_block_size);
auto merging_step = std::make_unique<ScalableMergingAggregatedStep>(
getContext(),
plan->getCurrentDataStream(),
params,
false);
steps.emplace_back(merging_step.get());
plan->addStep(std::move(merging_step));

#endif
if (settings.distributed_aggregation_memory_efficient)
{
auto merging_step = std::make_unique<ScalableMergingAggregatedStep>(getContext(), plan->getCurrentDataStream(), params, false);
steps.emplace_back(merging_step.get());
plan->addStep(std::move(merging_step));
}
else
{
auto merging_step = std::make_unique<DB::MergingAggregatedStep>(
plan->getCurrentDataStream(),
params,
true,
false,
1,
1,
false,
settings.max_block_size,
settings.aggregation_in_order_max_block_bytes,
SortDescription(),
settings.enable_memory_bound_merging_of_aggregation_results);
steps.emplace_back(merging_step.get());
plan->addStep(std::move(merging_step));
}
}

void AggregateRelParser::addAggregatingStep()
Expand Down
Loading

0 comments on commit 83c1af3

Please sign in to comment.