Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refine the spill threshold for aggregation/sort, refine the memory usage calculation for aggregation #6708

Merged
merged 11 commits into from
Feb 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions dbms/src/Common/ThresholdUtils.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <assert.h>

#include <algorithm>

inline size_t getAverageThreshold(size_t threshold, size_t concurrency)
{
assert(concurrency > 0);
if (threshold == 0)
return 0;
return std::max(static_cast<size_t>(1), threshold / concurrency);
}
1 change: 1 addition & 0 deletions dbms/src/DataStreams/AggregatingBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ Block AggregatingBlockInputStream::readImpl()
return this->isCancelled();
};
aggregator.setCancellationHook(hook);
aggregator.initThresholdByAggregatedDataVariantsSize(1);

aggregator.execute(children.back(), *data_variants);

Expand Down
7 changes: 3 additions & 4 deletions dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,7 @@ void ParallelAggregatingBlockInputStream::Handler::onBlock(Block & block, size_t
block,
*parent.many_data[thread_num],
parent.threads_data[thread_num].key_columns,
parent.threads_data[thread_num].aggregate_columns,
parent.threads_data[thread_num].local_delta_memory);
parent.threads_data[thread_num].aggregate_columns);

parent.threads_data[thread_num].src_rows += block.rows();
parent.threads_data[thread_num].src_bytes += block.bytes();
Expand Down Expand Up @@ -173,6 +172,7 @@ void ParallelAggregatingBlockInputStream::execute()

for (size_t i = 0; i < max_threads; ++i)
threads_data.emplace_back(keys_size, aggregates_size);
aggregator.initThresholdByAggregatedDataVariantsSize(many_data.size());

LOG_TRACE(log, "Aggregating");

Expand Down Expand Up @@ -226,8 +226,7 @@ void ParallelAggregatingBlockInputStream::execute()
children.at(0)->getHeader(),
*many_data[0],
threads_data[0].key_columns,
threads_data[0].aggregate_columns,
threads_data[0].local_delta_memory);
threads_data[0].aggregate_columns);
}

void ParallelAggregatingBlockInputStream::appendInfo(FmtBuffer & buffer) const
Expand Down
1 change: 0 additions & 1 deletion dbms/src/DataStreams/ParallelAggregatingBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ class ParallelAggregatingBlockInputStream : public IProfilingBlockInputStream
{
size_t src_rows = 0;
size_t src_bytes = 0;
Int64 local_delta_memory = 0;

ColumnRawPtrs key_columns;
Aggregator::AggregateColumns aggregate_columns;
Expand Down
9 changes: 7 additions & 2 deletions dbms/src/Flash/Coprocessor/AggregationInterpreterHelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/ThresholdUtils.h>
#include <Common/TiFlashException.h>
#include <Core/ColumnNumbers.h>
#include <Flash/Coprocessor/AggregationInterpreterHelper.h>
Expand Down Expand Up @@ -76,6 +77,7 @@ Aggregator::Params buildParams(
const Context & context,
const Block & before_agg_header,
size_t before_agg_streams_size,
size_t agg_streams_size,
const Names & key_names,
const TiDB::TiDBCollators & collators,
const AggregateDescriptions & aggregate_descriptions,
Expand All @@ -91,16 +93,19 @@ Aggregator::Params buildParams(
const Settings & settings = context.getSettingsRef();

bool allow_to_use_two_level_group_by = isAllowToUseTwoLevelGroupBy(before_agg_streams_size, settings);
auto total_two_level_threshold_bytes = allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold_bytes : SettingUInt64(0);

bool has_collator = std::any_of(begin(collators), end(collators), [](const auto & p) { return p != nullptr; });

return Aggregator::Params(
before_agg_header,
keys,
aggregate_descriptions,
/// do not use the average value for key count threshold, because for a random distributed data, the key count
/// in every threads should almost be the same
allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold : SettingUInt64(0),
allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold_bytes : SettingUInt64(0),
settings.max_bytes_before_external_group_by,
getAverageThreshold(total_two_level_threshold_bytes, agg_streams_size),
getAverageThreshold(settings.max_bytes_before_external_group_by, agg_streams_size),
!is_final_agg,
spill_config,
context.getSettingsRef().max_block_size,
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Flash/Coprocessor/AggregationInterpreterHelper.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ Aggregator::Params buildParams(
const Context & context,
const Block & before_agg_header,
size_t before_agg_streams_size,
size_t agg_streams_size,
const Names & key_names,
const TiDB::TiDBCollators & collators,
const AggregateDescriptions & aggregate_descriptions,
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#include <Common/FailPoint.h>
#include <Common/ThresholdUtils.h>
#include <Common/TiFlashException.h>
#include <Core/NamesAndTypes.h>
#include <DataStreams/AggregatingBlockInputStream.h>
Expand Down Expand Up @@ -397,6 +398,7 @@ void DAGQueryBlockInterpreter::executeAggregation(
context,
before_agg_header,
pipeline.streams.size(),
enable_fine_grained_shuffle ? pipeline.streams.size() : 1,
key_names,
collators,
aggregate_descriptions,
Expand Down
1 change: 0 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1447,5 +1447,4 @@ tipb::ScalarFuncSig reverseGetFuncSigByFuncName(const String & name)
throw Exception(fmt::format("Unsupported function {}", name));
return func_name_sig_map[name];
}

} // namespace DB
1 change: 0 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,5 +105,4 @@ class UniqueNameGenerator
tipb::DAGRequest getDAGRequestFromStringWithRetry(const String & s);
tipb::EncodeType analyzeDAGEncodeType(DAGContext & dag_context);
tipb::ScalarFuncSig reverseGetFuncSigByFuncName(const String & name);

} // namespace DB
3 changes: 2 additions & 1 deletion dbms/src/Flash/Coprocessor/InterpreterUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/ThresholdUtils.h>
#include <DataStreams/CreatingSetsBlockInputStream.h>
#include <DataStreams/ExpressionBlockInputStream.h>
#include <DataStreams/FilterBlockInputStream.h>
Expand Down Expand Up @@ -120,7 +121,7 @@ void orderStreams(
order_descr,
settings.max_block_size,
limit,
settings.max_bytes_before_external_sort,
getAverageThreshold(settings.max_bytes_before_external_sort, pipeline.streams.size()),
SpillConfig(context.getTemporaryPath(), fmt::format("{}_sort", log->identifier()), settings.max_spilled_size_per_spill, context.getFileProvider()),
log->identifier());
stream->setExtraInfo(String(enableFineGrainedShuffleExtraInfo));
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Flash/Planner/Plans/PhysicalAggregation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ void PhysicalAggregation::buildBlockInputStreamImpl(DAGPipeline & pipeline, Cont
context,
before_agg_header,
pipeline.streams.size(),
fine_grained_shuffle.enable() ? pipeline.streams.size() : 1,
aggregation_keys,
aggregation_collators,
aggregate_descriptions,
Expand Down
64 changes: 25 additions & 39 deletions dbms/src/Interpreters/Aggregator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,12 @@
#include <AggregateFunctions/AggregateFunctionArray.h>
#include <AggregateFunctions/AggregateFunctionCount.h>
#include <AggregateFunctions/AggregateFunctionState.h>
#include <Columns/ColumnTuple.h>
#include <Common/ClickHouseRevision.h>
#include <Common/FailPoint.h>
#include <Common/MemoryTracker.h>
#include <Common/Stopwatch.h>
#include <Common/ThreadManager.h>
#include <Common/ThresholdUtils.h>
#include <Common/typeid_cast.h>
#include <Common/wrapInvocable.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/MergingAndConvertingBlockInputStream.h>
#include <DataStreams/NativeBlockOutputStream.h>
#include <DataStreams/NullBlockInputStream.h>
#include <DataStreams/materializeBlock.h>
#include <DataTypes/DataTypeAggregateFunction.h>
Expand All @@ -34,14 +29,10 @@
#include <IO/CompressedWriteBuffer.h>
#include <Interpreters/Aggregator.h>
#include <Storages/Transaction/CollatorUtils.h>
#include <common/demangle.h>

#include <array>
#include <cassert>
#include <cstddef>
#include <future>
#include <iomanip>
#include <thread>

namespace DB
{
Expand Down Expand Up @@ -177,6 +168,7 @@ void AggregatedDataVariants::convertToTwoLevel()
default:
throw Exception("Wrong data variant passed.", ErrorCodes::LOGICAL_ERROR);
}
aggregator->useTwoLevelHashTable();
}


Expand Down Expand Up @@ -240,9 +232,6 @@ Aggregator::Aggregator(const Params & params_, const String & req_id)
, log(Logger::get(req_id))
, is_cancelled([]() { return false; })
{
if (current_memory_tracker)
memory_usage_before_aggregation = current_memory_tracker->get();

aggregate_functions.resize(params.aggregates_size);
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_functions[i] = params.aggregates[i].function.get();
Expand Down Expand Up @@ -735,8 +724,7 @@ bool Aggregator::executeOnBlock(
const Block & block,
AggregatedDataVariants & result,
ColumnRawPtrs & key_columns,
AggregateColumns & aggregate_columns,
Int64 & local_delta_memory)
AggregateColumns & aggregate_columns)
{
if (is_cancelled())
return true;
Expand Down Expand Up @@ -814,21 +802,14 @@ bool Aggregator::executeOnBlock(
}

size_t result_size = result.size();
Int64 current_memory_usage = 0;
if (current_memory_tracker)
{
current_memory_usage = current_memory_tracker->get();
auto updated_local_delta_memory = CurrentMemoryTracker::getLocalDeltaMemory();
auto local_delta_memory_diff = updated_local_delta_memory - local_delta_memory;
current_memory_usage += (local_memory_usage.fetch_add(local_delta_memory_diff) + local_delta_memory_diff);
local_delta_memory = updated_local_delta_memory;
}

auto result_size_bytes = current_memory_usage - memory_usage_before_aggregation; /// Here all the results in the sum are taken into account, from different threads.
auto result_size_bytes = result.bytesCount();

/// worth_convert_to_two_level is set to true if
/// 1. some other threads already convert to two level
/// 2. the result size exceeds threshold
bool worth_convert_to_two_level
= (params.group_by_two_level_threshold && result_size >= params.group_by_two_level_threshold)
|| (params.group_by_two_level_threshold_bytes && result_size_bytes >= static_cast<Int64>(params.group_by_two_level_threshold_bytes));
= use_two_level_hash_table || (group_by_two_level_threshold && result_size >= group_by_two_level_threshold)
|| (group_by_two_level_threshold_bytes && result_size_bytes >= group_by_two_level_threshold_bytes);

/** Converting to a two-level data structure.
* It allows you to make, in the subsequent, an effective merge - either economical from memory or parallel.
Expand All @@ -837,16 +818,14 @@ bool Aggregator::executeOnBlock(
result.convertToTwoLevel();

/** Flush data to disk if too much RAM is consumed.
* Data can only be flushed to disk if a two-level aggregation structure is used.
* Data can only be flushed to disk if a two-level aggregation is supported.
*/
if (params.max_bytes_before_external_group_by
&& result.isTwoLevel()
&& current_memory_usage > static_cast<Int64>(params.max_bytes_before_external_group_by)
&& worth_convert_to_two_level)
{
/// todo: the memory usage is calculated by memory_tracker, it is not accurate since memory tracker
/// will tracker all the memory usage for the task/query, need to record and maintain the memory usage
/// in Aggregator directly.
if (max_bytes_before_external_group_by
&& (result.isTwoLevel() || result.isConvertibleToTwoLevel())
&& result_size_bytes > max_bytes_before_external_group_by)
{
if (!result.isTwoLevel())
result.convertToTwoLevel();
spill(result);
}

Expand All @@ -866,6 +845,13 @@ BlockInputStreams Aggregator::restoreSpilledData()
return spiller->restoreBlocks(0);
}

void Aggregator::initThresholdByAggregatedDataVariantsSize(size_t aggregated_data_variants_size)
{
group_by_two_level_threshold = params.getGroupByTwoLevelThreshold();
group_by_two_level_threshold_bytes = getAverageThreshold(params.getGroupByTwoLevelThresholdBytes(), aggregated_data_variants_size);
max_bytes_before_external_group_by = getAverageThreshold(params.getMaxBytesBeforeExternalGroupBy(), aggregated_data_variants_size);
}

void Aggregator::spill(AggregatedDataVariants & data_variants)
{
/// Flush only two-level data and possibly overflow data.
Expand Down Expand Up @@ -992,14 +978,14 @@ void Aggregator::execute(const BlockInputStreamPtr & stream, AggregatedDataVaria
src_rows += block.rows();
src_bytes += block.bytes();

if (!executeOnBlock(block, result, key_columns, aggregate_columns, params.local_delta_memory))
if (!executeOnBlock(block, result, key_columns, aggregate_columns))
break;
}

/// If there was no data, and we aggregate without keys, and we must return single row with the result of empty aggregation.
/// To do this, we pass a block with zero rows to aggregate.
if (result.empty() && params.keys_size == 0 && !params.empty_result_for_aggregation_by_empty_set)
executeOnBlock(stream->getHeader(), result, key_columns, aggregate_columns, params.local_delta_memory);
executeOnBlock(stream->getHeader(), result, key_columns, aggregate_columns);

double elapsed_seconds = watch.elapsedSeconds();
size_t rows = result.size();
Expand Down
Loading