Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
Signed-off-by: xufei <xufeixw@mail.ustc.edu.cn>
  • Loading branch information
windtalker committed Feb 6, 2023
1 parent 79bffe3 commit 3435b39
Show file tree
Hide file tree
Showing 10 changed files with 48 additions and 27 deletions.
25 changes: 25 additions & 0 deletions dbms/src/Common/ThresholdUtils.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <algorithm>

inline size_t getAverageThreshold(size_t threshold, size_t concurrency)
{
assert(concurrency > 0);
if (threshold == 0)
return 0;
return std::max(static_cast<size_t>(1), threshold / concurrency);
}
1 change: 1 addition & 0 deletions dbms/src/DataStreams/AggregatingBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ Block AggregatingBlockInputStream::readImpl()
return this->isCancelled();
};
aggregator.setCancellationHook(hook);
aggregator.initThresholdByAggregatedDataVariantsSize(1);

aggregator.execute(children.back(), *data_variants);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ void ParallelAggregatingBlockInputStream::execute()

for (size_t i = 0; i < max_threads; ++i)
threads_data.emplace_back(keys_size, aggregates_size);
aggregator.initThresholdByAggregatedDataVariantsSize(many_data.size());

LOG_TRACE(log, "Aggregating");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/ThresholdUtils.h>
#include <Common/TiFlashException.h>
#include <Core/ColumnNumbers.h>
#include <Flash/Coprocessor/AggregationInterpreterHelper.h>
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#include <Common/FailPoint.h>
#include <Common/ThresholdUtils.h>
#include <Common/TiFlashException.h>
#include <Core/NamesAndTypes.h>
#include <DataStreams/AggregatingBlockInputStream.h>
Expand Down
9 changes: 0 additions & 9 deletions dbms/src/Flash/Coprocessor/DAGUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1447,13 +1447,4 @@ tipb::ScalarFuncSig reverseGetFuncSigByFuncName(const String & name)
throw Exception(fmt::format("Unsupported function {}", name));
return func_name_sig_map[name];
}

size_t getAverageThreshold(size_t threshold, size_t concurrency)
{
assert(concurrency > 0);
if (threshold == 0)
return 0;
return std::max(1, threshold / concurrency);
}

} // namespace DB
3 changes: 0 additions & 3 deletions dbms/src/Flash/Coprocessor/DAGUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,4 @@ class UniqueNameGenerator
tipb::DAGRequest getDAGRequestFromStringWithRetry(const String & s);
tipb::EncodeType analyzeDAGEncodeType(DAGContext & dag_context);
tipb::ScalarFuncSig reverseGetFuncSigByFuncName(const String & name);

size_t getAverageThreshold(size_t threshold, size_t concurrency);

} // namespace DB
3 changes: 2 additions & 1 deletion dbms/src/Flash/Coprocessor/InterpreterUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/ThresholdUtils.h>
#include <DataStreams/CreatingSetsBlockInputStream.h>
#include <DataStreams/ExpressionBlockInputStream.h>
#include <DataStreams/FilterBlockInputStream.h>
Expand Down Expand Up @@ -120,7 +121,7 @@ void orderStreams(
order_descr,
settings.max_block_size,
limit,
getAverageThreshold(settings.max_bytes_before_external_sort, std::min(max_streams, pipeline.streams.size())),
getAverageThreshold(settings.max_bytes_before_external_sort, pipeline.streams.size()),
SpillConfig(context.getTemporaryPath(), fmt::format("{}_sort", log->identifier()), settings.max_spilled_size_per_spill, context.getFileProvider()),
log->identifier());
stream->setExtraInfo(String(enableFineGrainedShuffleExtraInfo));
Expand Down
26 changes: 12 additions & 14 deletions dbms/src/Interpreters/Aggregator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,12 @@
#include <AggregateFunctions/AggregateFunctionArray.h>
#include <AggregateFunctions/AggregateFunctionCount.h>
#include <AggregateFunctions/AggregateFunctionState.h>
#include <Columns/ColumnTuple.h>
#include <Common/ClickHouseRevision.h>
#include <Common/FailPoint.h>
#include <Common/MemoryTracker.h>
#include <Common/Stopwatch.h>
#include <Common/ThreadManager.h>
#include <Common/ThresholdUtils.h>
#include <Common/typeid_cast.h>
#include <Common/wrapInvocable.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/MergingAndConvertingBlockInputStream.h>
#include <DataStreams/NativeBlockOutputStream.h>
#include <DataStreams/NullBlockInputStream.h>
#include <DataStreams/materializeBlock.h>
#include <DataTypes/DataTypeAggregateFunction.h>
Expand All @@ -34,14 +29,10 @@
#include <IO/CompressedWriteBuffer.h>
#include <Interpreters/Aggregator.h>
#include <Storages/Transaction/CollatorUtils.h>
#include <common/demangle.h>

#include <array>
#include <cassert>
#include <cstddef>
#include <future>
#include <iomanip>
#include <thread>

namespace DB
{
Expand Down Expand Up @@ -817,8 +808,8 @@ bool Aggregator::executeOnBlock(
/// 1. some other threads already convert to two level
/// 2. the result size exceeds threshold
bool worth_convert_to_two_level
= use_two_level_hash_table || (params.group_by_two_level_threshold && result_size >= params.group_by_two_level_threshold)
|| (params.group_by_two_level_threshold_bytes && result_size_bytes >= params.group_by_two_level_threshold_bytes);
= use_two_level_hash_table || (group_by_two_level_threshold && result_size >= group_by_two_level_threshold)
|| (group_by_two_level_threshold_bytes && result_size_bytes >= group_by_two_level_threshold_bytes);

/** Converting to a two-level data structure.
* It allows you to make, in the subsequent, an effective merge - either economical from memory or parallel.
Expand All @@ -829,9 +820,9 @@ bool Aggregator::executeOnBlock(
/** Flush data to disk if too much RAM is consumed.
* Data can only be flushed to disk if a two-level aggregation is supported.
*/
if (params.max_bytes_before_external_group_by
if (max_bytes_before_external_group_by
&& (result.isTwoLevel() || result.isConvertibleToTwoLevel())
&& result_size_bytes > params.max_bytes_before_external_group_by)
&& result_size_bytes > max_bytes_before_external_group_by)
{
if (!result.isTwoLevel())
result.convertToTwoLevel();
Expand All @@ -854,6 +845,13 @@ BlockInputStreams Aggregator::restoreSpilledData()
return spiller->restoreBlocks(0);
}

void Aggregator::initThresholdByAggregatedDataVariantsSize(size_t aggregated_data_variants_size)
{
group_by_two_level_threshold = params.group_by_two_level_threshold;
group_by_two_level_threshold_bytes = getAverageThreshold(params.group_by_two_level_threshold_bytes, aggregated_data_variants_size);
max_bytes_before_external_group_by = getAverageThreshold(params.max_bytes_before_external_group_by, aggregated_data_variants_size);
}

void Aggregator::spill(AggregatedDataVariants & data_variants)
{
/// Flush only two-level data and possibly overflow data.
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Interpreters/Aggregator.h
Original file line number Diff line number Diff line change
Expand Up @@ -1047,6 +1047,7 @@ class Aggregator
BlockInputStreams restoreSpilledData();
bool hasSpilledData() const { return spiller != nullptr && spiller->hasSpilledData(); }
void useTwoLevelHashTable() { use_two_level_hash_table = true; }
void initThresholdByAggregatedDataVariantsSize(size_t aggregated_data_variants_size);

/// Get data structure of the result.
Block getHeader(bool final) const;
Expand Down Expand Up @@ -1100,6 +1101,10 @@ class Aggregator
/// Returns true if you can abort the current task.
CancellationHook is_cancelled;

size_t group_by_two_level_threshold = 0;
size_t group_by_two_level_threshold_bytes = 0;
size_t max_bytes_before_external_group_by = 0;

/// For external aggregation.
std::unique_ptr<Spiller> spiller;

Expand Down

0 comments on commit 3435b39

Please sign in to comment.