diff --git a/dbms/src/Common/ThresholdUtils.h b/dbms/src/Common/ThresholdUtils.h new file mode 100644 index 00000000000..255e3788572 --- /dev/null +++ b/dbms/src/Common/ThresholdUtils.h @@ -0,0 +1,27 @@ +// Copyright 2023 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +#include + +inline size_t getAverageThreshold(size_t threshold, size_t concurrency) +{ + assert(concurrency > 0); + if (threshold == 0) + return 0; + return std::max(static_cast(1), threshold / concurrency); +} diff --git a/dbms/src/DataStreams/AggregatingBlockInputStream.cpp b/dbms/src/DataStreams/AggregatingBlockInputStream.cpp index dc91a49a096..ad47d57f056 100644 --- a/dbms/src/DataStreams/AggregatingBlockInputStream.cpp +++ b/dbms/src/DataStreams/AggregatingBlockInputStream.cpp @@ -34,6 +34,7 @@ Block AggregatingBlockInputStream::readImpl() return this->isCancelled(); }; aggregator.setCancellationHook(hook); + aggregator.initThresholdByAggregatedDataVariantsSize(1); aggregator.execute(children.back(), *data_variants); diff --git a/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp b/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp index 86ce5724597..a2514b78f82 100644 --- a/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp +++ b/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp @@ -115,8 +115,7 @@ void ParallelAggregatingBlockInputStream::Handler::onBlock(Block & block, size_t block, *parent.many_data[thread_num], parent.threads_data[thread_num].key_columns, - parent.threads_data[thread_num].aggregate_columns, - parent.threads_data[thread_num].local_delta_memory); + parent.threads_data[thread_num].aggregate_columns); parent.threads_data[thread_num].src_rows += block.rows(); parent.threads_data[thread_num].src_bytes += block.bytes(); @@ -173,6 +172,7 @@ void ParallelAggregatingBlockInputStream::execute() for (size_t i = 0; i < max_threads; ++i) threads_data.emplace_back(keys_size, aggregates_size); + aggregator.initThresholdByAggregatedDataVariantsSize(many_data.size()); LOG_TRACE(log, "Aggregating"); @@ -226,8 +226,7 @@ void ParallelAggregatingBlockInputStream::execute() children.at(0)->getHeader(), *many_data[0], threads_data[0].key_columns, - threads_data[0].aggregate_columns, - threads_data[0].local_delta_memory); + threads_data[0].aggregate_columns); } void ParallelAggregatingBlockInputStream::appendInfo(FmtBuffer & buffer) const diff --git a/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.h b/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.h index a23b7f7a516..6575a58de0a 100644 --- a/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.h +++ b/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.h @@ -84,7 +84,6 @@ class ParallelAggregatingBlockInputStream : public IProfilingBlockInputStream { size_t src_rows = 0; size_t src_bytes = 0; - Int64 local_delta_memory = 0; ColumnRawPtrs key_columns; Aggregator::AggregateColumns aggregate_columns; diff --git a/dbms/src/Flash/Coprocessor/AggregationInterpreterHelper.cpp b/dbms/src/Flash/Coprocessor/AggregationInterpreterHelper.cpp index 8ee50a111fe..bc0b5bb0c1f 100644 --- a/dbms/src/Flash/Coprocessor/AggregationInterpreterHelper.cpp +++ b/dbms/src/Flash/Coprocessor/AggregationInterpreterHelper.cpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include @@ -76,6 +77,7 @@ Aggregator::Params buildParams( const Context & context, const Block & before_agg_header, size_t before_agg_streams_size, + size_t agg_streams_size, const Names & key_names, const TiDB::TiDBCollators & collators, const AggregateDescriptions & aggregate_descriptions, @@ -91,6 +93,7 @@ Aggregator::Params buildParams( const Settings & settings = context.getSettingsRef(); bool allow_to_use_two_level_group_by = isAllowToUseTwoLevelGroupBy(before_agg_streams_size, settings); + auto total_two_level_threshold_bytes = allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold_bytes : SettingUInt64(0); bool has_collator = std::any_of(begin(collators), end(collators), [](const auto & p) { return p != nullptr; }); @@ -98,9 +101,11 @@ Aggregator::Params buildParams( before_agg_header, keys, aggregate_descriptions, + /// do not use the average value for key count threshold, because for a random distributed data, the key count + /// in every threads should almost be the same allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold : SettingUInt64(0), - allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold_bytes : SettingUInt64(0), - settings.max_bytes_before_external_group_by, + getAverageThreshold(total_two_level_threshold_bytes, agg_streams_size), + getAverageThreshold(settings.max_bytes_before_external_group_by, agg_streams_size), !is_final_agg, spill_config, context.getSettingsRef().max_block_size, diff --git a/dbms/src/Flash/Coprocessor/AggregationInterpreterHelper.h b/dbms/src/Flash/Coprocessor/AggregationInterpreterHelper.h index 48b6afeeab5..d3fa500a893 100644 --- a/dbms/src/Flash/Coprocessor/AggregationInterpreterHelper.h +++ b/dbms/src/Flash/Coprocessor/AggregationInterpreterHelper.h @@ -37,6 +37,7 @@ Aggregator::Params buildParams( const Context & context, const Block & before_agg_header, size_t before_agg_streams_size, + size_t agg_streams_size, const Names & key_names, const TiDB::TiDBCollators & collators, const AggregateDescriptions & aggregate_descriptions, diff --git a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp index 9fcc310d1bc..106277ce75e 100644 --- a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp @@ -13,6 +13,7 @@ // limitations under the License. #include +#include #include #include #include @@ -397,6 +398,7 @@ void DAGQueryBlockInterpreter::executeAggregation( context, before_agg_header, pipeline.streams.size(), + enable_fine_grained_shuffle ? pipeline.streams.size() : 1, key_names, collators, aggregate_descriptions, diff --git a/dbms/src/Flash/Coprocessor/DAGUtils.cpp b/dbms/src/Flash/Coprocessor/DAGUtils.cpp index a7462671c77..180ba343d30 100755 --- a/dbms/src/Flash/Coprocessor/DAGUtils.cpp +++ b/dbms/src/Flash/Coprocessor/DAGUtils.cpp @@ -1447,5 +1447,4 @@ tipb::ScalarFuncSig reverseGetFuncSigByFuncName(const String & name) throw Exception(fmt::format("Unsupported function {}", name)); return func_name_sig_map[name]; } - } // namespace DB diff --git a/dbms/src/Flash/Coprocessor/DAGUtils.h b/dbms/src/Flash/Coprocessor/DAGUtils.h index 5776edf0098..1fe665e9c89 100644 --- a/dbms/src/Flash/Coprocessor/DAGUtils.h +++ b/dbms/src/Flash/Coprocessor/DAGUtils.h @@ -105,5 +105,4 @@ class UniqueNameGenerator tipb::DAGRequest getDAGRequestFromStringWithRetry(const String & s); tipb::EncodeType analyzeDAGEncodeType(DAGContext & dag_context); tipb::ScalarFuncSig reverseGetFuncSigByFuncName(const String & name); - } // namespace DB diff --git a/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp b/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp index adfa0f08ac5..6ea2622ddfc 100644 --- a/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp +++ b/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include @@ -120,7 +121,7 @@ void orderStreams( order_descr, settings.max_block_size, limit, - settings.max_bytes_before_external_sort, + getAverageThreshold(settings.max_bytes_before_external_sort, pipeline.streams.size()), SpillConfig(context.getTemporaryPath(), fmt::format("{}_sort", log->identifier()), settings.max_spilled_size_per_spill, context.getFileProvider()), log->identifier()); stream->setExtraInfo(String(enableFineGrainedShuffleExtraInfo)); diff --git a/dbms/src/Flash/Planner/Plans/PhysicalAggregation.cpp b/dbms/src/Flash/Planner/Plans/PhysicalAggregation.cpp index 90a4f441e35..6efdf523b6a 100644 --- a/dbms/src/Flash/Planner/Plans/PhysicalAggregation.cpp +++ b/dbms/src/Flash/Planner/Plans/PhysicalAggregation.cpp @@ -99,6 +99,7 @@ void PhysicalAggregation::buildBlockInputStreamImpl(DAGPipeline & pipeline, Cont context, before_agg_header, pipeline.streams.size(), + fine_grained_shuffle.enable() ? pipeline.streams.size() : 1, aggregation_keys, aggregation_collators, aggregate_descriptions, diff --git a/dbms/src/Interpreters/Aggregator.cpp b/dbms/src/Interpreters/Aggregator.cpp index 1c73d3e28e9..3dc1c98e8b5 100644 --- a/dbms/src/Interpreters/Aggregator.cpp +++ b/dbms/src/Interpreters/Aggregator.cpp @@ -15,17 +15,12 @@ #include #include #include -#include -#include #include -#include #include #include +#include #include -#include -#include #include -#include #include #include #include @@ -34,14 +29,10 @@ #include #include #include -#include #include #include -#include #include -#include -#include namespace DB { @@ -177,6 +168,7 @@ void AggregatedDataVariants::convertToTwoLevel() default: throw Exception("Wrong data variant passed.", ErrorCodes::LOGICAL_ERROR); } + aggregator->useTwoLevelHashTable(); } @@ -240,9 +232,6 @@ Aggregator::Aggregator(const Params & params_, const String & req_id) , log(Logger::get(req_id)) , is_cancelled([]() { return false; }) { - if (current_memory_tracker) - memory_usage_before_aggregation = current_memory_tracker->get(); - aggregate_functions.resize(params.aggregates_size); for (size_t i = 0; i < params.aggregates_size; ++i) aggregate_functions[i] = params.aggregates[i].function.get(); @@ -735,8 +724,7 @@ bool Aggregator::executeOnBlock( const Block & block, AggregatedDataVariants & result, ColumnRawPtrs & key_columns, - AggregateColumns & aggregate_columns, - Int64 & local_delta_memory) + AggregateColumns & aggregate_columns) { if (is_cancelled()) return true; @@ -814,21 +802,14 @@ bool Aggregator::executeOnBlock( } size_t result_size = result.size(); - Int64 current_memory_usage = 0; - if (current_memory_tracker) - { - current_memory_usage = current_memory_tracker->get(); - auto updated_local_delta_memory = CurrentMemoryTracker::getLocalDeltaMemory(); - auto local_delta_memory_diff = updated_local_delta_memory - local_delta_memory; - current_memory_usage += (local_memory_usage.fetch_add(local_delta_memory_diff) + local_delta_memory_diff); - local_delta_memory = updated_local_delta_memory; - } - - auto result_size_bytes = current_memory_usage - memory_usage_before_aggregation; /// Here all the results in the sum are taken into account, from different threads. + auto result_size_bytes = result.bytesCount(); + /// worth_convert_to_two_level is set to true if + /// 1. some other threads already convert to two level + /// 2. the result size exceeds threshold bool worth_convert_to_two_level - = (params.group_by_two_level_threshold && result_size >= params.group_by_two_level_threshold) - || (params.group_by_two_level_threshold_bytes && result_size_bytes >= static_cast(params.group_by_two_level_threshold_bytes)); + = use_two_level_hash_table || (group_by_two_level_threshold && result_size >= group_by_two_level_threshold) + || (group_by_two_level_threshold_bytes && result_size_bytes >= group_by_two_level_threshold_bytes); /** Converting to a two-level data structure. * It allows you to make, in the subsequent, an effective merge - either economical from memory or parallel. @@ -837,16 +818,14 @@ bool Aggregator::executeOnBlock( result.convertToTwoLevel(); /** Flush data to disk if too much RAM is consumed. - * Data can only be flushed to disk if a two-level aggregation structure is used. + * Data can only be flushed to disk if a two-level aggregation is supported. */ - if (params.max_bytes_before_external_group_by - && result.isTwoLevel() - && current_memory_usage > static_cast(params.max_bytes_before_external_group_by) - && worth_convert_to_two_level) - { - /// todo: the memory usage is calculated by memory_tracker, it is not accurate since memory tracker - /// will tracker all the memory usage for the task/query, need to record and maintain the memory usage - /// in Aggregator directly. + if (max_bytes_before_external_group_by + && (result.isTwoLevel() || result.isConvertibleToTwoLevel()) + && result_size_bytes > max_bytes_before_external_group_by) + { + if (!result.isTwoLevel()) + result.convertToTwoLevel(); spill(result); } @@ -866,6 +845,13 @@ BlockInputStreams Aggregator::restoreSpilledData() return spiller->restoreBlocks(0); } +void Aggregator::initThresholdByAggregatedDataVariantsSize(size_t aggregated_data_variants_size) +{ + group_by_two_level_threshold = params.getGroupByTwoLevelThreshold(); + group_by_two_level_threshold_bytes = getAverageThreshold(params.getGroupByTwoLevelThresholdBytes(), aggregated_data_variants_size); + max_bytes_before_external_group_by = getAverageThreshold(params.getMaxBytesBeforeExternalGroupBy(), aggregated_data_variants_size); +} + void Aggregator::spill(AggregatedDataVariants & data_variants) { /// Flush only two-level data and possibly overflow data. @@ -992,14 +978,14 @@ void Aggregator::execute(const BlockInputStreamPtr & stream, AggregatedDataVaria src_rows += block.rows(); src_bytes += block.bytes(); - if (!executeOnBlock(block, result, key_columns, aggregate_columns, params.local_delta_memory)) + if (!executeOnBlock(block, result, key_columns, aggregate_columns)) break; } /// If there was no data, and we aggregate without keys, and we must return single row with the result of empty aggregation. /// To do this, we pass a block with zero rows to aggregate. if (result.empty() && params.keys_size == 0 && !params.empty_result_for_aggregation_by_empty_set) - executeOnBlock(stream->getHeader(), result, key_columns, aggregate_columns, params.local_delta_memory); + executeOnBlock(stream->getHeader(), result, key_columns, aggregate_columns); double elapsed_seconds = watch.elapsedSeconds(); size_t rows = result.size(); diff --git a/dbms/src/Interpreters/Aggregator.h b/dbms/src/Interpreters/Aggregator.h index e8dbf0d4d69..087f338eb96 100644 --- a/dbms/src/Interpreters/Aggregator.h +++ b/dbms/src/Interpreters/Aggregator.h @@ -718,7 +718,7 @@ struct AggregatedDataVariants : private boost::noncopyable case Type::NAME: \ { \ const auto * ptr = reinterpret_cast(aggregation_method_impl); \ - return ptr->data.size() + (without_key != nullptr); \ + return ptr->data.size(); \ } APPLY_FOR_AGGREGATED_VARIANTS(M) @@ -729,6 +729,34 @@ struct AggregatedDataVariants : private boost::noncopyable } } + size_t bytesCount() const + { + size_t bytes_count = 0; + switch (type) + { + case Type::EMPTY: + case Type::without_key: + break; + +#define M(NAME, IS_TWO_LEVEL) \ + case Type::NAME: \ + { \ + const auto * ptr = reinterpret_cast(aggregation_method_impl); \ + bytes_count = ptr->data.getBufferSizeInBytes(); \ + break; \ + } + + APPLY_FOR_AGGREGATED_VARIANTS(M) +#undef M + + default: + throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT); + } + for (const auto & pool : aggregates_pools) + bytes_count += pool->size(); + return bytes_count; + } + const char * getMethodName() const { return getMethodName(type); @@ -888,23 +916,10 @@ class Aggregator AggregateDescriptions aggregates; size_t keys_size; size_t aggregates_size; - Int64 local_delta_memory = 0; - - /// Two-level aggregation settings (used for a large number of keys). - /** With how many keys or the size of the aggregation state in bytes, - * two-level aggregation begins to be used. Enough to reach of at least one of the thresholds. - * 0 - the corresponding threshold is not specified. - */ - const size_t group_by_two_level_threshold; - const size_t group_by_two_level_threshold_bytes; - - /// Settings to flush temporary data to the filesystem (external aggregation). - const size_t max_bytes_before_external_group_by; /// 0 - do not use external aggregation. /// Return empty result when aggregating without keys on empty set. bool empty_result_for_aggregation_by_empty_set; - const std::string tmp_path; SpillConfig spill_config; UInt64 max_block_size; @@ -926,13 +941,13 @@ class Aggregator , aggregates(aggregates_) , keys_size(keys.size()) , aggregates_size(aggregates.size()) - , group_by_two_level_threshold(group_by_two_level_threshold_) - , group_by_two_level_threshold_bytes(group_by_two_level_threshold_bytes_) - , max_bytes_before_external_group_by(max_bytes_before_external_group_by_) , empty_result_for_aggregation_by_empty_set(empty_result_for_aggregation_by_empty_set_) , spill_config(spill_config_) , max_block_size(max_block_size_) , collators(collators_) + , group_by_two_level_threshold(group_by_two_level_threshold_) + , group_by_two_level_threshold_bytes(group_by_two_level_threshold_bytes_) + , max_bytes_before_external_group_by(max_bytes_before_external_group_by_) { } @@ -962,6 +977,17 @@ class Aggregator /// Calculate the column numbers in `keys` and `aggregates`. void calculateColumnNumbers(const Block & block); + + size_t getGroupByTwoLevelThreshold() const { return group_by_two_level_threshold; } + size_t getGroupByTwoLevelThresholdBytes() const { return group_by_two_level_threshold_bytes; } + size_t getMaxBytesBeforeExternalGroupBy() const { return max_bytes_before_external_group_by; } + + private: + /// Note these thresholds should not be used directly, they are only used to + /// init the threshold in Aggregator + const size_t group_by_two_level_threshold; + const size_t group_by_two_level_threshold_bytes; + const size_t max_bytes_before_external_group_by; /// 0 - do not use external aggregation. }; @@ -980,8 +1006,8 @@ class Aggregator const Block & block, AggregatedDataVariants & result, ColumnRawPtrs & key_columns, - AggregateColumns & aggregate_columns, /// Passed to not create them anew for each block - Int64 & local_delta_memory); + AggregateColumns & aggregate_columns /// Passed to not create them anew for each block + ); /** Convert the aggregation data structure into a block. * If final = false, then ColumnAggregateFunction is created as the aggregation columns with the state of the calculations, @@ -1020,6 +1046,8 @@ class Aggregator void finishSpill(); BlockInputStreams restoreSpilledData(); bool hasSpilledData() const { return spiller != nullptr && spiller->hasSpilledData(); } + void useTwoLevelHashTable() { use_two_level_hash_table = true; } + void initThresholdByAggregatedDataVariantsSize(size_t aggregated_data_variants_size); /// Get data structure of the result. Block getHeader(bool final) const; @@ -1064,10 +1092,7 @@ class Aggregator bool all_aggregates_has_trivial_destructor = false; - /// How many RAM were used to process the query before processing the first block. - Int64 memory_usage_before_aggregation = 0; - - std::atomic local_memory_usage = 0; + std::atomic use_two_level_hash_table = false; std::mutex mutex; @@ -1076,6 +1101,16 @@ class Aggregator /// Returns true if you can abort the current task. CancellationHook is_cancelled; + /// Two-level aggregation settings (used for a large number of keys). + /** With how many keys or the size of the aggregation state in bytes, + * two-level aggregation begins to be used. Enough to reach of at least one of the thresholds. + * 0 - the corresponding threshold is not specified. + */ + size_t group_by_two_level_threshold = 0; + size_t group_by_two_level_threshold_bytes = 0; + /// Settings to flush temporary data to the filesystem (external aggregation). + size_t max_bytes_before_external_group_by = 0; + /// For external aggregation. std::unique_ptr spiller;