diff --git a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHTransformerApi.scala b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHTransformerApi.scala index 17c8fa11add6..4aa1875bc79b 100644 --- a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHTransformerApi.scala +++ b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHTransformerApi.scala @@ -96,6 +96,12 @@ class CHTransformerApi extends TransformerApi with Logging { nativeConfMap.put(groupBySpillKey, groupBySpillValue.toLong.toString) } + val maxMemoryUsageKey = settingPrefix + "max_memory_usage"; + if (!nativeConfMap.containsKey(maxMemoryUsageKey)) { + val maxMemoryUsageValue = offHeapSize + nativeConfMap.put(maxMemoryUsageKey, maxMemoryUsageValue.toLong.toString) + } + // Only set default max_bytes_before_external_join for CH when join_algorithm is grace_hash val joinAlgorithmKey = settingPrefix + "join_algorithm"; if ( diff --git a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/HashAggregateMetricsUpdater.scala b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/HashAggregateMetricsUpdater.scala index 3dbb075a1f91..2a6015a280c3 100644 --- a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/HashAggregateMetricsUpdater.scala +++ b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/HashAggregateMetricsUpdater.scala @@ -94,6 +94,14 @@ class HashAggregateMetricsUpdater(val metrics: Map[String, SQLMetric]) } object HashAggregateMetricsUpdater { - val INCLUDING_PROCESSORS = Array("AggregatingTransform", "MergingAggregatedTransform") - val CH_PLAN_NODE_NAME = Array("AggregatingTransform", "MergingAggregatedTransform") + val INCLUDING_PROCESSORS = Array( + "AggregatingTransform", + "StreamingAggregatingTransform", + "MergingAggregatedTransform", + "GraceMergingAggregatedTransform") + val CH_PLAN_NODE_NAME = Array( + "AggregatingTransform", + "StreamingAggregatingTransform", + "MergingAggregatedTransform", + "GraceMergingAggregatedTransform") } diff --git a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHColumnarShuffleParquetAQESuite.scala b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHColumnarShuffleParquetAQESuite.scala index 0814c3c8c7d7..51b685f0fdf7 100644 --- a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHColumnarShuffleParquetAQESuite.scala +++ b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHColumnarShuffleParquetAQESuite.scala @@ -66,8 +66,6 @@ class GlutenClickHouseTPCHColumnarShuffleParquetAQESuite assert(plans(2).metrics("outputRows").value === 600572) assert(plans(1).metrics("inputRows").value === 591673) - assert(plans(1).metrics("resizeInputRows").value === 4) - assert(plans(1).metrics("resizeOutputRows").value === 4) assert(plans(1).metrics("outputRows").value === 4) assert(plans(1).metrics("outputVectors").value === 1) @@ -93,8 +91,6 @@ class GlutenClickHouseTPCHColumnarShuffleParquetAQESuite assert(plans(2).metrics("filesSize").value === 17777735) assert(plans(1).metrics("inputRows").value === 591673) - assert(plans(1).metrics("resizeInputRows").value === 4) - assert(plans(1).metrics("resizeOutputRows").value === 4) assert(plans(1).metrics("outputRows").value === 4) assert(plans(1).metrics("outputVectors").value === 1) diff --git a/backends-clickhouse/src/test/scala/io/glutenproject/execution/metrics/GlutenClickHouseTPCHMetricsSuite.scala b/backends-clickhouse/src/test/scala/io/glutenproject/execution/metrics/GlutenClickHouseTPCHMetricsSuite.scala index 91fbc57b6c08..d81141f9c09c 100644 --- a/backends-clickhouse/src/test/scala/io/glutenproject/execution/metrics/GlutenClickHouseTPCHMetricsSuite.scala +++ b/backends-clickhouse/src/test/scala/io/glutenproject/execution/metrics/GlutenClickHouseTPCHMetricsSuite.scala @@ -311,15 +311,14 @@ class GlutenClickHouseTPCHMetricsSuite extends GlutenClickHouseTPCHAbstractSuite .get(0) .getProcessors .get(0) - .getInputRows == 591677) - + .getInputRows == 591673) assert( nativeMetricsData.metricsDataList .get(4) .getSteps .get(0) .getProcessors - .get(1) + .get(0) .getOutputRows == 4) assert( @@ -356,7 +355,7 @@ class GlutenClickHouseTPCHMetricsSuite extends GlutenClickHouseTPCHAbstractSuite .getSteps .get(0) .getName - .equals("MergingAggregated")) + .equals("GraceMergingAggregatedStep")) assert( nativeMetricsDataFinal.metricsDataList.get(1).getSteps.get(1).getName.equals("Expression")) assert(nativeMetricsDataFinal.metricsDataList.get(2).getName.equals("kProject")) diff --git a/cpp-ch/local-engine/Operator/GraceMergingAggregatedStep.cpp b/cpp-ch/local-engine/Operator/GraceMergingAggregatedStep.cpp new file mode 100644 index 000000000000..0307404e26d0 --- /dev/null +++ b/cpp-ch/local-engine/Operator/GraceMergingAggregatedStep.cpp @@ -0,0 +1,455 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "GraceMergingAggregatedStep.h" +#include +#include +#include +#include +#include + +namespace DB +{ +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} +} + +namespace local_engine +{ +static DB::ITransformingStep::Traits getTraits() +{ + return DB::ITransformingStep::Traits + { + { + .preserves_number_of_streams = false, + .preserves_sorting = false, + }, + { + .preserves_number_of_rows = false, + } + }; +} + +static DB::Block buildOutputHeader(const DB::Block & input_header_, const DB::Aggregator::Params params_) +{ + return params_.getHeader(input_header_, true); +} + +GraceMergingAggregatedStep::GraceMergingAggregatedStep( + DB::ContextPtr context_, + const DB::DataStream & input_stream_, + DB::Aggregator::Params params_) + : DB::ITransformingStep( + input_stream_, buildOutputHeader(input_stream_.header, params_), getTraits()) + , context(context_) + , params(std::move(params_)) +{ +} + +void GraceMergingAggregatedStep::transformPipeline(DB::QueryPipelineBuilder & pipeline, const DB::BuildQueryPipelineSettings &) +{ + auto num_streams = pipeline.getNumStreams(); + auto transform_params = std::make_shared(pipeline.getHeader(), params, true); + pipeline.resize(1); + auto build_transform = [&](DB::OutputPortRawPtrs outputs) + { + DB::Processors new_processors; + for (auto & output : outputs) + { + auto op = std::make_shared(pipeline.getHeader(), transform_params, context); + new_processors.push_back(op); + DB::connect(*output, op->getInputs().front()); + } + return new_processors; + }; + pipeline.transform(build_transform); + pipeline.resize(num_streams, true); +} + +void GraceMergingAggregatedStep::describeActions(DB::IQueryPlanStep::FormatSettings & settings) const +{ + return params.explain(settings.out, settings.offset); +} + +void GraceMergingAggregatedStep::describeActions(DB::JSONBuilder::JSONMap & map) const +{ + params.explain(map); +} + +void GraceMergingAggregatedStep::updateOutputStream() +{ + output_stream = createOutputStream(input_streams.front(), buildOutputHeader(input_streams.front().header, params), getDataStreamTraits()); +} + +GraceMergingAggregatedTransform::GraceMergingAggregatedTransform(const DB::Block &header_, DB::AggregatingTransformParamsPtr params_, DB::ContextPtr context_) + : IProcessor({header_}, {params_->getHeader()}) + , header(header_) + , params(params_) + , context(context_) + , tmp_data_disk(std::make_unique(context_->getTempDataOnDisk())) +{ + current_data_variants = std::make_shared(); + // bucket 0 is for in-memory data, it's just a placeholder. + buckets.emplace(0, BufferFileStream()); +} + +GraceMergingAggregatedTransform::~GraceMergingAggregatedTransform() +{ + LOG_INFO( + logger, + "Metrics. total_input_blocks: {}, total_input_rows: {}, total_output_blocks: {}, total_output_rows: {}, total_spill_disk_bytes: " + "{}, total_spill_disk_time: {}, total_read_disk_time: {}, total_scatter_time: {}", + total_input_blocks, + total_input_rows, + total_output_blocks, + total_output_rows, + total_spill_disk_bytes, + total_spill_disk_time, + total_read_disk_time, + total_scatter_time); +} + +GraceMergingAggregatedTransform::Status GraceMergingAggregatedTransform::prepare() +{ + auto & output = outputs.front(); + auto & input = inputs.front(); + if (output.isFinished()) + { + input.close(); + return Status::Finished; + } + if (has_output) + { + if (output.canPush()) + { + total_output_rows += output_chunk.getNumRows(); + total_output_blocks++; + output.push(std::move(output_chunk)); + has_output = false; + } + return Status::PortFull; + } + + if (has_input) + return Status::Ready; + + if (!input_finished) + { + if (input.isFinished()) + { + input_finished = true; + return Status::Ready; + } + input.setNeeded(); + if (!input.hasData()) + return Status::NeedData; + input_chunk = input.pull(true); + total_input_rows += input_chunk.getNumRows(); + total_input_blocks++; + has_input = true; + return Status::Ready; + } + + if (current_bucket_index >= getBucketsNum() && current_final_blocks.empty()) + { + output.finish(); + return Status::Finished; + } + return Status::Ready; +} + +void GraceMergingAggregatedTransform::work() +{ + if (has_input) + { + assert(!input_finished); + auto block = header.cloneWithColumns(input_chunk.detachColumns()); + mergeOneBlock(block); + has_input = false; + } + else + { + assert(input_finished); + auto pop_one_chunk = [&]() + { + while (!current_final_blocks.empty()) + { + if (!current_final_blocks.front().rows()) + { + current_final_blocks.pop_front(); + continue; + } + + auto & block = current_final_blocks.front(); + output_chunk = DB::Chunk(block.getColumns(), block.rows()); + current_final_blocks.pop_front(); + has_output = true; + return; + } + }; + + if (current_final_blocks.empty()) + { + if (current_bucket_index >= getBucketsNum()) + return; + prepareBucketOutputBlocks(); + current_bucket_index++; + current_data_variants = nullptr; + } + pop_one_chunk(); + } +} + +void GraceMergingAggregatedTransform::extendBuckets() +{ + auto current_size = getBucketsNum(); + auto next_size = current_size * 2; + if (next_size > max_buckets) + throw DB::Exception( + DB::ErrorCodes::LOGICAL_ERROR, + "Too many buckets, limit is {}. Please consider increate offhead size or partitoin number", + max_buckets); + LOG_INFO(logger, "extend buckets from {} to {}", current_size, next_size); + for (size_t i = current_size; i < next_size; ++i) + buckets.emplace(i, BufferFileStream()); +} + +void GraceMergingAggregatedTransform::rehashDataVariants() +{ + auto blocks = params->aggregator.convertToBlocks(*current_data_variants, false, 1); + + size_t block_rows = 0; + size_t block_memory_usage = 0; + for (const auto & block : blocks) + { + block_rows += block.rows(); + block_memory_usage += block.allocatedBytes(); + } + if (block_rows) + per_key_memory_usage = block_memory_usage * 1.0 / block_rows; + + current_data_variants = std::make_shared(); + no_more_keys = false; + for (auto & block : blocks) + { + auto scattered_blocks = scatterBlock(block); + block = {}; + for (size_t i = current_bucket_index + 1; i < getBucketsNum(); ++i) + { + addBlockIntoFileBucket(i, scattered_blocks[i]); + scattered_blocks[i] = {}; + } + params->aggregator.mergeOnBlock(scattered_blocks[current_bucket_index], *current_data_variants, no_more_keys); + } +}; + +DB::Blocks GraceMergingAggregatedTransform::scatterBlock(const DB::Block & block) +{ + if (!block.rows()) + return {}; + Stopwatch watch; + size_t bucket_num = getBucketsNum(); + if (static_cast(block.info.bucket_num) == bucket_num) + return {block}; + auto blocks = DB::JoinCommon::scatterBlockByHash(params->params.keys, block, bucket_num); + for (auto & new_block : blocks) + { + new_block.info.bucket_num = static_cast(bucket_num); + } + total_scatter_time += watch.elapsedMilliseconds(); + return blocks; +} + +void GraceMergingAggregatedTransform::addBlockIntoFileBucket(size_t bucket_index, const DB::Block & block) +{ + if (!block.rows()) + return; + auto & file_stream = buckets[bucket_index]; + file_stream.blocks.push_back(block); +} + +void GraceMergingAggregatedTransform::flushBuckets() +{ + auto before_mem = getMemoryUsage(); + size_t flush_bytes = 0; + Stopwatch watch; + for (size_t i = current_bucket_index + 1; i < getBucketsNum(); ++i) + flush_bytes += flushBucket(i); + total_spill_disk_time += watch.elapsedMilliseconds(); + total_spill_disk_bytes += flush_bytes; + LOG_INFO(logger, "flush {} in {} ms, memoery usage: {} -> {}", ReadableSize(flush_bytes), watch.elapsedMilliseconds(), ReadableSize(before_mem), ReadableSize(getMemoryUsage())); +} + +size_t GraceMergingAggregatedTransform::flushBucket(size_t bucket_index) +{ + auto & file_stream = buckets[bucket_index]; + if (file_stream.blocks.empty()) + return 0; + if (!file_stream.file_stream) + file_stream.file_stream = &tmp_data_disk->createStream(header); + DB::Blocks blocks; + size_t flush_bytes = 0; + while (!file_stream.blocks.empty()) + { + while (!file_stream.blocks.empty()) + { + if (!blocks.empty() && blocks.back().info.bucket_num != file_stream.blocks.front().info.bucket_num) + break; + blocks.push_back(file_stream.blocks.front()); + file_stream.blocks.pop_front(); + } + auto bucket = blocks.front().info.bucket_num; + auto merged_block = DB::concatenateBlocks(blocks); + merged_block.info.bucket_num = bucket; + blocks.clear(); + flush_bytes += merged_block.bytes(); + if (merged_block.rows()) + { + file_stream.file_stream->write(merged_block); + } + } + return flush_bytes; +} + +void GraceMergingAggregatedTransform::prepareBucketOutputBlocks() +{ + size_t read_bytes = 0; + size_t read_rows = 0; + Stopwatch watch; + if (!current_data_variants) + { + current_data_variants = std::make_shared(); + no_more_keys = false; + } + auto & buffer_file_stream = buckets[current_bucket_index]; + + if (buffer_file_stream.file_stream) + { + buffer_file_stream.file_stream->finishWriting(); + while (true) + { + auto block = buffer_file_stream.file_stream->read(); + if (!block.rows()) + break; + read_bytes += block.bytes(); + read_rows += block.rows(); + mergeOneBlock(block); + block = {}; + } + buffer_file_stream.file_stream = nullptr; + total_read_disk_time += watch.elapsedMilliseconds(); + } + if (!buffer_file_stream.blocks.empty()) + { + for (auto & block : buffer_file_stream.blocks) + { + mergeOneBlock(block); + block = {}; + } + } + current_final_blocks = params->aggregator.convertToBlocks(*current_data_variants, true, 1); + LOG_INFO(logger, "prepare to output bucket {}, read bytes: {}, read rows: {}, time: {} ms", current_bucket_index, ReadableSize(read_bytes), read_rows, watch.elapsedMilliseconds()); +} + +void GraceMergingAggregatedTransform::mergeOneBlock(const DB::Block &block) +{ + if (!block.rows()) + return; + + if (isMemoryOverflow()) + flushBuckets(); + + if (isMemoryOverflow()) + { + extendBuckets(); + rehashDataVariants(); + } + + LOG_TRACE( + logger, + "merge on block, rows: {}, bytes:{}, bucket: {}. current bucket: {}, total bucket: {}, mem used: {}", + block.rows(), + ReadableSize(block.bytes()), + block.info.bucket_num, + current_bucket_index, + getBucketsNum(), + ReadableSize(getMemoryUsage())); + + if (block.info.bucket_num == static_cast(getBucketsNum()) || getBucketsNum() == 1) + { + params->aggregator.mergeOnBlock(block, *current_data_variants, no_more_keys); + } + else + { + auto scattered_blocks = scatterBlock(block); + for (size_t i = current_bucket_index + 1; i < getBucketsNum(); ++i) + { + addBlockIntoFileBucket(i, scattered_blocks[i]); + } + params->aggregator.mergeOnBlock(scattered_blocks[current_bucket_index], *current_data_variants, no_more_keys); + } +} + +size_t GraceMergingAggregatedTransform::getMemoryUsage() +{ + Int64 current_memory_usage = 0; + if (auto * memory_tracker_child = DB::CurrentThread::getMemoryTracker()) + if (auto * memory_tracker = memory_tracker_child->getParent()) + current_memory_usage = memory_tracker->get(); + return current_memory_usage < 0 ? 0 : current_memory_usage; +} + +bool GraceMergingAggregatedTransform::isMemoryOverflow() +{ + /// More greedy memory usage strategy. + if (!context->getSettingsRef().max_memory_usage) + return false; + auto max_mem_used = context->getSettingsRef().max_memory_usage * 8 / 10; + auto current_result_rows = current_data_variants->size(); + auto current_mem_used = getMemoryUsage(); + if (per_key_memory_usage > 0) + { + if (current_mem_used + per_key_memory_usage * current_result_rows >= max_mem_used) + { + LOG_INFO( + logger, + "Memory is overflow. current_mem_used: {}, max_mem_used: {}, per_key_memory_usage: {}, aggregator keys: {}, buckets: {}", + ReadableSize(current_mem_used), + ReadableSize(max_mem_used), + ReadableSize(per_key_memory_usage), + current_result_rows, + getBucketsNum()); + return true; + } + } + else + { + if (current_mem_used * 2 >= context->getSettingsRef().max_memory_usage) + { + LOG_INFO( + logger, + "Memory is overflow on half of max usage. current_mem_used: {}, max_mem_used: {}, buckets: {}", + ReadableSize(current_mem_used), + ReadableSize(context->getSettingsRef().max_memory_usage), + getBucketsNum()); + return true; + } + } + return false; +} +} diff --git a/cpp-ch/local-engine/Operator/GraceMergingAggregatedStep.h b/cpp-ch/local-engine/Operator/GraceMergingAggregatedStep.h new file mode 100644 index 000000000000..fe8d66fab95a --- /dev/null +++ b/cpp-ch/local-engine/Operator/GraceMergingAggregatedStep.h @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace local_engine +{ +class GraceMergingAggregatedStep : public DB::ITransformingStep +{ +public: + explicit GraceMergingAggregatedStep( + DB::ContextPtr context_, + const DB::DataStream & input_stream_, + DB::Aggregator::Params params_); + ~GraceMergingAggregatedStep() override = default; + + String getName() const override { return "GraceMergingAggregatedStep"; } + + void transformPipeline(DB::QueryPipelineBuilder & pipeline, const DB::BuildQueryPipelineSettings &) override; + + void describeActions(DB::JSONBuilder::JSONMap & map) const override; + void describeActions(DB::IQueryPlanStep::FormatSettings & settings) const override; +private: + DB::ContextPtr context; + DB::Aggregator::Params params; + void updateOutputStream() override; +}; + +class GraceMergingAggregatedTransform : public DB::IProcessor +{ +public: + static constexpr size_t max_buckets = 32; + using Status = DB::IProcessor::Status; + explicit GraceMergingAggregatedTransform(const DB::Block &header_, DB::AggregatingTransformParamsPtr params_, DB::ContextPtr context_); + ~GraceMergingAggregatedTransform() override; + + Status prepare() override; + void work() override; + String getName() const override { return "GraceMergingAggregatedTransform"; } +private: + DB::Block header; + DB::AggregatingTransformParamsPtr params; + DB::ContextPtr context; + DB::TemporaryDataOnDiskPtr tmp_data_disk; + DB::AggregatedDataVariantsPtr current_data_variants = nullptr; + size_t current_bucket_index = 0; + + struct BufferFileStream + { + std::list blocks; + DB::TemporaryFileStream * file_stream = nullptr; + }; + std::unordered_map buckets; + + size_t getBucketsNum() const { return buckets.size(); } + void extendBuckets(); + void rehashDataVariants(); + DB::Blocks scatterBlock(const DB::Block & block); + void addBlockIntoFileBucket(size_t bucket_index, const DB::Block & block); + void flushBuckets(); + size_t flushBucket(size_t bucket_index); + void prepareBucketOutputBlocks(); + void mergeOneBlock(const DB::Block &block); + size_t getMemoryUsage(); + bool isMemoryOverflow(); + + bool input_finished = false; + bool has_input = false; + DB::Chunk input_chunk; + bool has_output = false; + DB::Chunk output_chunk; + DB::BlocksList current_final_blocks; + bool no_more_keys = false; + + double per_key_memory_usage = 0; + + // metrics + size_t total_input_blocks = 0; + size_t total_input_rows = 0; + size_t total_output_blocks = 0; + size_t total_output_rows = 0; + size_t total_spill_disk_bytes = 0; + size_t total_spill_disk_time = 0; + size_t total_read_disk_time = 0; + size_t total_scatter_time = 0; + + Poco::Logger * logger = &Poco::Logger::get("GraceMergingAggregatedTransform"); +}; +} diff --git a/cpp-ch/local-engine/Operator/StreamingAggregatingStep.cpp b/cpp-ch/local-engine/Operator/StreamingAggregatingStep.cpp new file mode 100644 index 000000000000..266bca07c10b --- /dev/null +++ b/cpp-ch/local-engine/Operator/StreamingAggregatingStep.cpp @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "StreamingAggregatingStep.h" +#include +#include +#include +#include +#include + +namespace DB +{ +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} +} + +namespace local_engine +{ +StreamingAggregatingTransform::StreamingAggregatingTransform(DB::ContextPtr context_, const DB::Block &header_, DB::AggregatingTransformParamsPtr params_) + : DB::IProcessor({header_}, {params_->getHeader()}) + , context(context_) + , header(header_) + , key_columns(params_->params.keys_size) + , aggregate_columns(params_->params.aggregates_size) + , params(params_) +{ +} + +StreamingAggregatingTransform::~StreamingAggregatingTransform() +{ + LOG_INFO( + logger, + "Metrics. total_input_blocks: {}, total_input_rows: {}, total_output_blocks: {}, total_output_rows: {}, " + "total_clear_data_variants_num: {}, total_aggregate_time: {}, total_convert_data_variants_time: {}", + total_input_blocks, + total_input_rows, + total_output_blocks, + total_output_rows, + total_clear_data_variants_num, + total_aggregate_time, + total_convert_data_variants_time); +} + +StreamingAggregatingTransform::Status StreamingAggregatingTransform::prepare() +{ + auto & output = outputs.front(); + auto & input = inputs.front(); + if (output.isFinished()) + { + input.close(); + return Status::Finished; + } + if (has_output) + { + if (output.canPush()) + { + total_output_rows += output_chunk.getNumRows(); + total_output_blocks++; + output.push(std::move(output_chunk)); + has_output = false; + } + return Status::PortFull; + } + + if (has_input) + return Status::Ready; + + if (input.isFinished()) + { + if (!data_variants) + { + output.finish(); + return Status::Finished; + } + else + { + return Status::Ready; + } + } + + input.setNeeded(); + if (!input.hasData()) + { + return Status::NeedData; + } + input_chunk = input.pull(true); + total_input_rows += input_chunk.getNumRows(); + total_input_blocks++; + has_input = true; + return Status::Ready; +} + +static UInt64 getMemoryUsage() +{ + Int64 current_memory_usage = 0; + if (auto * memory_tracker_child = DB::CurrentThread::getMemoryTracker()) + if (auto * memory_tracker = memory_tracker_child->getParent()) + current_memory_usage = memory_tracker->get(); + return current_memory_usage < 0 ? 0 : current_memory_usage; +} + +bool StreamingAggregatingTransform::isMemoryOverflow() +{ + /// More greedy memory usage strategy. + if (!context->getSettingsRef().max_memory_usage) + return false; + auto max_mem_used = context->getSettingsRef().max_memory_usage * 8 / 10; + auto current_result_rows = data_variants->size(); + auto current_mem_used = getMemoryUsage(); + if (per_key_memory_usage > 0) + { + if (current_mem_used + per_key_memory_usage * current_result_rows >= max_mem_used) + { + LOG_INFO( + logger, + "Memory is overflow. current_mem_used: {}, max_mem_used: {}, per_key_memory_usage: {}, aggregator keys: {}", + ReadableSize(current_mem_used), + ReadableSize(max_mem_used), + ReadableSize(per_key_memory_usage), + current_result_rows); + return true; + } + } + else + { + if (current_mem_used * 2 >= context->getSettingsRef().max_memory_usage) + { + LOG_INFO( + logger, + "Memory is overflow on half of max usage. current_mem_used: {}, max_mem_used: {}", + ReadableSize(current_mem_used), + ReadableSize(context->getSettingsRef().max_memory_usage)); + return true; + } + } + return false; +} + + +void StreamingAggregatingTransform::work() +{ + auto pop_one_pending_block = [&]() + { + while (!pending_blocks.empty()) + { + if (!pending_blocks.front().rows()) + { + pending_blocks.pop_front(); + continue; + } + // downstream is GraceMergingAggregatedStep, don't need this bock_info. + // make it be default value. + pending_blocks.front().info = DB::BlockInfo(); + + output_chunk = DB::convertToChunk(pending_blocks.front()); + pending_blocks.pop_front(); + has_output = true; + has_input = !pending_blocks.empty(); + return true; + } + return false; + }; + + if (has_input) + { + if (pop_one_pending_block()) + return; + + if (!input_chunk.getNumRows()) + { + has_input = false; + return; + } + + if (!data_variants) + data_variants = std::make_shared(); + + auto num_rows = input_chunk.getNumRows(); + Stopwatch watch; + params->aggregator.executeOnBlock( + input_chunk.detachColumns(), 0, num_rows, *data_variants, key_columns, aggregate_columns, no_more_keys); + total_aggregate_time += watch.elapsedMicroseconds(); + has_input = false; + + if (isMemoryOverflow()) + { + Stopwatch convert_watch; + /// When convert data variants to blocks, memory usage may be double. + pending_blocks = params->aggregator.convertToBlocks(*data_variants, false, 1); + + size_t total_mem_used = 0; + size_t total_rows = 0; + for (const auto & block : pending_blocks) + { + total_mem_used += block.allocatedBytes(); + total_rows += block.rows(); + } + if (total_rows) + per_key_memory_usage = total_mem_used * 1.0 / total_rows; + + total_convert_data_variants_time += convert_watch.elapsedMicroseconds(); + total_clear_data_variants_num++; + data_variants = nullptr; + pop_one_pending_block(); + } + } + else + { + // NOLINTNEXTLINE + if (!data_variants->size()) + { + has_output = false; + } + Stopwatch convert_watch; + pending_blocks = params->aggregator.convertToBlocks(*data_variants, false, 1); + total_clear_data_variants_num++; + total_aggregate_time += convert_watch.elapsedMicroseconds(); + data_variants = nullptr; + pop_one_pending_block(); + } +} + +static DB::ITransformingStep::Traits getTraits() +{ + return DB::ITransformingStep::Traits + { + { + .preserves_number_of_streams = false, + .preserves_sorting = false, + }, + { + .preserves_number_of_rows = false, + } + }; +} + +static DB::Block buildOutputHeader(const DB::Block & input_header_, const DB::Aggregator::Params params_) +{ + return params_.getHeader(input_header_, false); +} +StreamingAggregatingStep::StreamingAggregatingStep( + DB::ContextPtr context_, const DB::DataStream & input_stream_, DB::Aggregator::Params params_) + : DB::ITransformingStep(input_stream_, buildOutputHeader(input_stream_.header, params_), getTraits()) + , context(context_) + , params(std::move(params_)) +{ +} + +void StreamingAggregatingStep::transformPipeline(DB::QueryPipelineBuilder & pipeline, const DB::BuildQueryPipelineSettings &) +{ + pipeline.dropTotalsAndExtremes(); + auto transform_params = std::make_shared(pipeline.getHeader(), params, false); + pipeline.resize(1); + auto build_transform = [&](DB::OutputPortRawPtrs outputs) + { + DB::Processors new_processors; + for (auto & output : outputs) + { + auto op = std::make_shared(context, pipeline.getHeader(), transform_params); + new_processors.push_back(op); + DB::connect(*output, op->getInputs().front()); + } + return new_processors; + }; + pipeline.transform(build_transform); +} + +void StreamingAggregatingStep::describeActions(DB::IQueryPlanStep::FormatSettings & settings) const +{ + return params.explain(settings.out, settings.offset); +} + +void StreamingAggregatingStep::describeActions(DB::JSONBuilder::JSONMap & map) const +{ + params.explain(map); +} + +void StreamingAggregatingStep::updateOutputStream() +{ + output_stream = createOutputStream(input_streams.front(), buildOutputHeader(input_streams.front().header, params), getDataStreamTraits()); +} + +} diff --git a/cpp-ch/local-engine/Operator/StreamingAggregatingStep.h b/cpp-ch/local-engine/Operator/StreamingAggregatingStep.h new file mode 100644 index 000000000000..3d54c1af615b --- /dev/null +++ b/cpp-ch/local-engine/Operator/StreamingAggregatingStep.h @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace local_engine +{ +class StreamingAggregatingTransform : public DB::IProcessor +{ +public: + using Status = DB::IProcessor::Status; + explicit StreamingAggregatingTransform(DB::ContextPtr context_, const DB::Block &header_, DB::AggregatingTransformParamsPtr params_); + ~StreamingAggregatingTransform() override; + String getName() const override { return "StreamingAggregatingTransform"; } + Status prepare() override; + void work() override; +private: + DB::ContextPtr context; + DB::Block header; + DB::ColumnRawPtrs key_columns; + DB::Aggregator::AggregateColumns aggregate_columns; + DB::AggregatingTransformParamsPtr params; + + bool no_more_keys = false; + bool is_consume_finished = false; + bool is_clear_aggregator = false; + DB::AggregatedDataVariantsPtr data_variants = nullptr; + bool has_input = false; + bool has_output = false; + DB::Chunk input_chunk; + DB::Chunk output_chunk; + + DB::BlocksList pending_blocks; + Poco::Logger * logger = &Poco::Logger::get("StreamingAggregatingTransform"); + + double per_key_memory_usage = 0; + + // metrics + size_t total_input_blocks = 0; + size_t total_input_rows = 0; + size_t total_output_blocks = 0; + size_t total_output_rows = 0; + size_t total_clear_data_variants_num = 0; + size_t total_aggregate_time = 0; + size_t total_convert_data_variants_time = 0; + + bool isMemoryOverflow(); +}; + +class StreamingAggregatingStep : public DB::ITransformingStep +{ +public: + explicit StreamingAggregatingStep( + DB::ContextPtr context_, + const DB::DataStream & input_stream_, + DB::Aggregator::Params params_); + ~StreamingAggregatingStep() override = default; + + String getName() const override { return "StreamingAggregating"; } + + void transformPipeline(DB::QueryPipelineBuilder & pipeline, const DB::BuildQueryPipelineSettings &) override; + + void describeActions(DB::JSONBuilder::JSONMap & map) const override; + void describeActions(DB::IQueryPlanStep::FormatSettings & settings) const override; + +private: + DB::ContextPtr context; + DB::Aggregator::Params params; + void updateOutputStream() override; +}; +} diff --git a/cpp-ch/local-engine/Parser/AggregateRelParser.cpp b/cpp-ch/local-engine/Parser/AggregateRelParser.cpp index d55fac073ccf..fd6de6d2e28d 100644 --- a/cpp-ch/local-engine/Parser/AggregateRelParser.cpp +++ b/cpp-ch/local-engine/Parser/AggregateRelParser.cpp @@ -20,17 +20,18 @@ #include #include #include +#include #include #include +#include +#include +#include #include #include #include #include #include "Common/PODArray.h" #include -#include "DataTypes/IDataType.h" - -#include namespace DB { @@ -237,20 +238,30 @@ void AggregateRelParser::addMergingAggregatedStep() buildAggregateDescriptions(aggregate_descriptions); auto settings = getContext()->getSettingsRef(); Aggregator::Params params(grouping_keys, aggregate_descriptions, false, settings.max_threads, settings.max_block_size); - auto merging_step = std::make_unique( - plan->getCurrentDataStream(), - params, - true, - false, - 1, - 1, - false, - settings.max_block_size, - settings.aggregation_in_order_max_block_bytes, - SortDescription(), - settings.enable_memory_bound_merging_of_aggregation_results); - steps.emplace_back(merging_step.get()); - plan->addStep(std::move(merging_step)); + + if (settings.distributed_aggregation_memory_efficient) + { + auto merging_step = std::make_unique(getContext(), plan->getCurrentDataStream(), params); + steps.emplace_back(merging_step.get()); + plan->addStep(std::move(merging_step)); + } + else + { + auto merging_step = std::make_unique( + plan->getCurrentDataStream(), + params, + true, + false, + 1, + 1, + false, + settings.max_block_size, + settings.aggregation_in_order_max_block_bytes, + SortDescription(), + settings.enable_memory_bound_merging_of_aggregation_results); + steps.emplace_back(merging_step.get()); + plan->addStep(std::move(merging_step)); + } } void AggregateRelParser::addAggregatingStep() @@ -258,44 +269,74 @@ void AggregateRelParser::addAggregatingStep() AggregateDescriptions aggregate_descriptions; buildAggregateDescriptions(aggregate_descriptions); auto settings = getContext()->getSettingsRef(); - Aggregator::Params params( - grouping_keys, - aggregate_descriptions, - false, - settings.max_rows_to_group_by, - settings.group_by_overflow_mode, - settings.group_by_two_level_threshold, - settings.group_by_two_level_threshold_bytes, - settings.max_bytes_before_external_group_by, - settings.empty_result_for_aggregation_by_empty_set, - getContext()->getTempDataOnDisk(), - settings.max_threads, - settings.min_free_disk_space_for_temporary_data, - true, - 3, - settings.max_block_size, - /*enable_prefetch*/ true, - /*only_merge*/ false, - settings.optimize_group_by_constant_keys); - auto aggregating_step = std::make_unique( - plan->getCurrentDataStream(), - params, - GroupingSetsParamsList(), - false, - settings.max_block_size, - settings.aggregation_in_order_max_block_bytes, - 1, - 1, - false, - false, - SortDescription(), - SortDescription(), - false, - false, - false); - steps.emplace_back(aggregating_step.get()); - plan->addStep(std::move(aggregating_step)); + if (settings.distributed_aggregation_memory_efficient) + { + // Disable spilling to disk. + Aggregator::Params params( + grouping_keys, + aggregate_descriptions, + false, + settings.max_rows_to_group_by, + settings.group_by_overflow_mode, + settings.group_by_two_level_threshold, + settings.group_by_two_level_threshold_bytes, + 0, + settings.empty_result_for_aggregation_by_empty_set, + nullptr, + settings.max_threads, + settings.min_free_disk_space_for_temporary_data, + true, + 3, + settings.max_block_size, + /*enable_prefetch*/ true, + /*only_merge*/ false, + settings.optimize_group_by_constant_keys); + auto aggregating_step = std::make_unique(getContext(), plan->getCurrentDataStream(), params); + steps.emplace_back(aggregating_step.get()); + plan->addStep(std::move(aggregating_step)); + } + else + { + Aggregator::Params params( + grouping_keys, + aggregate_descriptions, + false, + settings.max_rows_to_group_by, + settings.group_by_overflow_mode, + settings.group_by_two_level_threshold, + settings.group_by_two_level_threshold_bytes, + settings.max_bytes_before_external_group_by, + settings.empty_result_for_aggregation_by_empty_set, + getContext()->getTempDataOnDisk(), + settings.max_threads, + settings.min_free_disk_space_for_temporary_data, + true, + 3, + settings.max_block_size, + /*enable_prefetch*/ true, + /*only_merge*/ false, + settings.optimize_group_by_constant_keys); + + auto aggregating_step = std::make_unique( + plan->getCurrentDataStream(), + params, + GroupingSetsParamsList(), + false, + settings.max_block_size, + settings.aggregation_in_order_max_block_bytes, + 1, + 1, + false, + false, + SortDescription(), + SortDescription(), + false, + false, + false); + steps.emplace_back(aggregating_step.get()); + plan->addStep(std::move(aggregating_step)); + } } // Only be called in final stage. diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp index 636ccc843691..0aa7a4d56c05 100644 --- a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp +++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp @@ -434,6 +434,7 @@ DataTypePtr wrapNullableType(bool nullable, DataTypePtr nested_type) QueryPlanPtr SerializedPlanParser::parse(std::unique_ptr plan) { auto * logger = &Poco::Logger::get("SerializedPlanParser"); + if (logger->debug()) { namespace pb_util = google::protobuf::util; diff --git a/cpp-ch/local-engine/Shuffle/CachedShuffleWriter.cpp b/cpp-ch/local-engine/Shuffle/CachedShuffleWriter.cpp index 4a011625874e..191b8efb0ff3 100644 --- a/cpp-ch/local-engine/Shuffle/CachedShuffleWriter.cpp +++ b/cpp-ch/local-engine/Shuffle/CachedShuffleWriter.cpp @@ -94,9 +94,9 @@ CachedShuffleWriter::CachedShuffleWriter(const String & short_name, const SplitO split_result.raw_partition_lengths.resize(options.partition_num, 0); } - void CachedShuffleWriter::split(DB::Block & block) { + auto block_info = block.info; initOutputIfNeeded(block); Stopwatch split_time_watch; @@ -112,6 +112,7 @@ void CachedShuffleWriter::split(DB::Block & block) { out_block.insert(block.getByPosition(output_columns_indicies[col_i])); } + out_block.info = block_info; partition_writer->write(partition_info, out_block); }