Skip to content

Commit

Permalink
Refine the spill threshold for aggregation/sort, refine the memory us…
Browse files Browse the repository at this point in the history
…age calculation for aggregation (#6708)

ref #6528
  • Loading branch information
windtalker authored Feb 9, 2023
1 parent 77c4d0f commit 8942a86
Show file tree
Hide file tree
Showing 13 changed files with 127 additions and 72 deletions.
27 changes: 27 additions & 0 deletions dbms/src/Common/ThresholdUtils.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <assert.h>

#include <algorithm>

inline size_t getAverageThreshold(size_t threshold, size_t concurrency)
{
assert(concurrency > 0);
if (threshold == 0)
return 0;
return std::max(static_cast<size_t>(1), threshold / concurrency);
}
1 change: 1 addition & 0 deletions dbms/src/DataStreams/AggregatingBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ Block AggregatingBlockInputStream::readImpl()
return this->isCancelled();
};
aggregator.setCancellationHook(hook);
aggregator.initThresholdByAggregatedDataVariantsSize(1);

aggregator.execute(children.back(), *data_variants);

Expand Down
7 changes: 3 additions & 4 deletions dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,7 @@ void ParallelAggregatingBlockInputStream::Handler::onBlock(Block & block, size_t
block,
*parent.many_data[thread_num],
parent.threads_data[thread_num].key_columns,
parent.threads_data[thread_num].aggregate_columns,
parent.threads_data[thread_num].local_delta_memory);
parent.threads_data[thread_num].aggregate_columns);

parent.threads_data[thread_num].src_rows += block.rows();
parent.threads_data[thread_num].src_bytes += block.bytes();
Expand Down Expand Up @@ -173,6 +172,7 @@ void ParallelAggregatingBlockInputStream::execute()

for (size_t i = 0; i < max_threads; ++i)
threads_data.emplace_back(keys_size, aggregates_size);
aggregator.initThresholdByAggregatedDataVariantsSize(many_data.size());

LOG_TRACE(log, "Aggregating");

Expand Down Expand Up @@ -226,8 +226,7 @@ void ParallelAggregatingBlockInputStream::execute()
children.at(0)->getHeader(),
*many_data[0],
threads_data[0].key_columns,
threads_data[0].aggregate_columns,
threads_data[0].local_delta_memory);
threads_data[0].aggregate_columns);
}

void ParallelAggregatingBlockInputStream::appendInfo(FmtBuffer & buffer) const
Expand Down
1 change: 0 additions & 1 deletion dbms/src/DataStreams/ParallelAggregatingBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ class ParallelAggregatingBlockInputStream : public IProfilingBlockInputStream
{
size_t src_rows = 0;
size_t src_bytes = 0;
Int64 local_delta_memory = 0;

ColumnRawPtrs key_columns;
Aggregator::AggregateColumns aggregate_columns;
Expand Down
9 changes: 7 additions & 2 deletions dbms/src/Flash/Coprocessor/AggregationInterpreterHelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/ThresholdUtils.h>
#include <Common/TiFlashException.h>
#include <Core/ColumnNumbers.h>
#include <Flash/Coprocessor/AggregationInterpreterHelper.h>
Expand Down Expand Up @@ -76,6 +77,7 @@ Aggregator::Params buildParams(
const Context & context,
const Block & before_agg_header,
size_t before_agg_streams_size,
size_t agg_streams_size,
const Names & key_names,
const TiDB::TiDBCollators & collators,
const AggregateDescriptions & aggregate_descriptions,
Expand All @@ -91,16 +93,19 @@ Aggregator::Params buildParams(
const Settings & settings = context.getSettingsRef();

bool allow_to_use_two_level_group_by = isAllowToUseTwoLevelGroupBy(before_agg_streams_size, settings);
auto total_two_level_threshold_bytes = allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold_bytes : SettingUInt64(0);

bool has_collator = std::any_of(begin(collators), end(collators), [](const auto & p) { return p != nullptr; });

return Aggregator::Params(
before_agg_header,
keys,
aggregate_descriptions,
/// do not use the average value for key count threshold, because for a random distributed data, the key count
/// in every threads should almost be the same
allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold : SettingUInt64(0),
allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold_bytes : SettingUInt64(0),
settings.max_bytes_before_external_group_by,
getAverageThreshold(total_two_level_threshold_bytes, agg_streams_size),
getAverageThreshold(settings.max_bytes_before_external_group_by, agg_streams_size),
!is_final_agg,
spill_config,
context.getSettingsRef().max_block_size,
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Flash/Coprocessor/AggregationInterpreterHelper.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ Aggregator::Params buildParams(
const Context & context,
const Block & before_agg_header,
size_t before_agg_streams_size,
size_t agg_streams_size,
const Names & key_names,
const TiDB::TiDBCollators & collators,
const AggregateDescriptions & aggregate_descriptions,
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#include <Common/FailPoint.h>
#include <Common/ThresholdUtils.h>
#include <Common/TiFlashException.h>
#include <Core/NamesAndTypes.h>
#include <DataStreams/AggregatingBlockInputStream.h>
Expand Down Expand Up @@ -397,6 +398,7 @@ void DAGQueryBlockInterpreter::executeAggregation(
context,
before_agg_header,
pipeline.streams.size(),
enable_fine_grained_shuffle ? pipeline.streams.size() : 1,
key_names,
collators,
aggregate_descriptions,
Expand Down
1 change: 0 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1447,5 +1447,4 @@ tipb::ScalarFuncSig reverseGetFuncSigByFuncName(const String & name)
throw Exception(fmt::format("Unsupported function {}", name));
return func_name_sig_map[name];
}

} // namespace DB
1 change: 0 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,5 +105,4 @@ class UniqueNameGenerator
tipb::DAGRequest getDAGRequestFromStringWithRetry(const String & s);
tipb::EncodeType analyzeDAGEncodeType(DAGContext & dag_context);
tipb::ScalarFuncSig reverseGetFuncSigByFuncName(const String & name);

} // namespace DB
3 changes: 2 additions & 1 deletion dbms/src/Flash/Coprocessor/InterpreterUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/ThresholdUtils.h>
#include <DataStreams/CreatingSetsBlockInputStream.h>
#include <DataStreams/ExpressionBlockInputStream.h>
#include <DataStreams/FilterBlockInputStream.h>
Expand Down Expand Up @@ -120,7 +121,7 @@ void orderStreams(
order_descr,
settings.max_block_size,
limit,
settings.max_bytes_before_external_sort,
getAverageThreshold(settings.max_bytes_before_external_sort, pipeline.streams.size()),
SpillConfig(context.getTemporaryPath(), fmt::format("{}_sort", log->identifier()), settings.max_spilled_size_per_spill, context.getFileProvider()),
log->identifier());
stream->setExtraInfo(String(enableFineGrainedShuffleExtraInfo));
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Flash/Planner/Plans/PhysicalAggregation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ void PhysicalAggregation::buildBlockInputStreamImpl(DAGPipeline & pipeline, Cont
context,
before_agg_header,
pipeline.streams.size(),
fine_grained_shuffle.enable() ? pipeline.streams.size() : 1,
aggregation_keys,
aggregation_collators,
aggregate_descriptions,
Expand Down
64 changes: 25 additions & 39 deletions dbms/src/Interpreters/Aggregator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,12 @@
#include <AggregateFunctions/AggregateFunctionArray.h>
#include <AggregateFunctions/AggregateFunctionCount.h>
#include <AggregateFunctions/AggregateFunctionState.h>
#include <Columns/ColumnTuple.h>
#include <Common/ClickHouseRevision.h>
#include <Common/FailPoint.h>
#include <Common/MemoryTracker.h>
#include <Common/Stopwatch.h>
#include <Common/ThreadManager.h>
#include <Common/ThresholdUtils.h>
#include <Common/typeid_cast.h>
#include <Common/wrapInvocable.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/MergingAndConvertingBlockInputStream.h>
#include <DataStreams/NativeBlockOutputStream.h>
#include <DataStreams/NullBlockInputStream.h>
#include <DataStreams/materializeBlock.h>
#include <DataTypes/DataTypeAggregateFunction.h>
Expand All @@ -34,14 +29,10 @@
#include <IO/CompressedWriteBuffer.h>
#include <Interpreters/Aggregator.h>
#include <Storages/Transaction/CollatorUtils.h>
#include <common/demangle.h>

#include <array>
#include <cassert>
#include <cstddef>
#include <future>
#include <iomanip>
#include <thread>

namespace DB
{
Expand Down Expand Up @@ -177,6 +168,7 @@ void AggregatedDataVariants::convertToTwoLevel()
default:
throw Exception("Wrong data variant passed.", ErrorCodes::LOGICAL_ERROR);
}
aggregator->useTwoLevelHashTable();
}


Expand Down Expand Up @@ -240,9 +232,6 @@ Aggregator::Aggregator(const Params & params_, const String & req_id)
, log(Logger::get(req_id))
, is_cancelled([]() { return false; })
{
if (current_memory_tracker)
memory_usage_before_aggregation = current_memory_tracker->get();

aggregate_functions.resize(params.aggregates_size);
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_functions[i] = params.aggregates[i].function.get();
Expand Down Expand Up @@ -735,8 +724,7 @@ bool Aggregator::executeOnBlock(
const Block & block,
AggregatedDataVariants & result,
ColumnRawPtrs & key_columns,
AggregateColumns & aggregate_columns,
Int64 & local_delta_memory)
AggregateColumns & aggregate_columns)
{
if (is_cancelled())
return true;
Expand Down Expand Up @@ -814,21 +802,14 @@ bool Aggregator::executeOnBlock(
}

size_t result_size = result.size();
Int64 current_memory_usage = 0;
if (current_memory_tracker)
{
current_memory_usage = current_memory_tracker->get();
auto updated_local_delta_memory = CurrentMemoryTracker::getLocalDeltaMemory();
auto local_delta_memory_diff = updated_local_delta_memory - local_delta_memory;
current_memory_usage += (local_memory_usage.fetch_add(local_delta_memory_diff) + local_delta_memory_diff);
local_delta_memory = updated_local_delta_memory;
}

auto result_size_bytes = current_memory_usage - memory_usage_before_aggregation; /// Here all the results in the sum are taken into account, from different threads.
auto result_size_bytes = result.bytesCount();

/// worth_convert_to_two_level is set to true if
/// 1. some other threads already convert to two level
/// 2. the result size exceeds threshold
bool worth_convert_to_two_level
= (params.group_by_two_level_threshold && result_size >= params.group_by_two_level_threshold)
|| (params.group_by_two_level_threshold_bytes && result_size_bytes >= static_cast<Int64>(params.group_by_two_level_threshold_bytes));
= use_two_level_hash_table || (group_by_two_level_threshold && result_size >= group_by_two_level_threshold)
|| (group_by_two_level_threshold_bytes && result_size_bytes >= group_by_two_level_threshold_bytes);

/** Converting to a two-level data structure.
* It allows you to make, in the subsequent, an effective merge - either economical from memory or parallel.
Expand All @@ -837,16 +818,14 @@ bool Aggregator::executeOnBlock(
result.convertToTwoLevel();

/** Flush data to disk if too much RAM is consumed.
* Data can only be flushed to disk if a two-level aggregation structure is used.
* Data can only be flushed to disk if a two-level aggregation is supported.
*/
if (params.max_bytes_before_external_group_by
&& result.isTwoLevel()
&& current_memory_usage > static_cast<Int64>(params.max_bytes_before_external_group_by)
&& worth_convert_to_two_level)
{
/// todo: the memory usage is calculated by memory_tracker, it is not accurate since memory tracker
/// will tracker all the memory usage for the task/query, need to record and maintain the memory usage
/// in Aggregator directly.
if (max_bytes_before_external_group_by
&& (result.isTwoLevel() || result.isConvertibleToTwoLevel())
&& result_size_bytes > max_bytes_before_external_group_by)
{
if (!result.isTwoLevel())
result.convertToTwoLevel();
spill(result);
}

Expand All @@ -866,6 +845,13 @@ BlockInputStreams Aggregator::restoreSpilledData()
return spiller->restoreBlocks(0);
}

void Aggregator::initThresholdByAggregatedDataVariantsSize(size_t aggregated_data_variants_size)
{
group_by_two_level_threshold = params.getGroupByTwoLevelThreshold();
group_by_two_level_threshold_bytes = getAverageThreshold(params.getGroupByTwoLevelThresholdBytes(), aggregated_data_variants_size);
max_bytes_before_external_group_by = getAverageThreshold(params.getMaxBytesBeforeExternalGroupBy(), aggregated_data_variants_size);
}

void Aggregator::spill(AggregatedDataVariants & data_variants)
{
/// Flush only two-level data and possibly overflow data.
Expand Down Expand Up @@ -992,14 +978,14 @@ void Aggregator::execute(const BlockInputStreamPtr & stream, AggregatedDataVaria
src_rows += block.rows();
src_bytes += block.bytes();

if (!executeOnBlock(block, result, key_columns, aggregate_columns, params.local_delta_memory))
if (!executeOnBlock(block, result, key_columns, aggregate_columns))
break;
}

/// If there was no data, and we aggregate without keys, and we must return single row with the result of empty aggregation.
/// To do this, we pass a block with zero rows to aggregate.
if (result.empty() && params.keys_size == 0 && !params.empty_result_for_aggregation_by_empty_set)
executeOnBlock(stream->getHeader(), result, key_columns, aggregate_columns, params.local_delta_memory);
executeOnBlock(stream->getHeader(), result, key_columns, aggregate_columns);

double elapsed_seconds = watch.elapsedSeconds();
size_t rows = result.size();
Expand Down
Loading

0 comments on commit 8942a86

Please sign in to comment.