Skip to content

Commit

Permalink
Add more spill tests and fix bug (#7073)
Browse files Browse the repository at this point in the history
ref #6528
  • Loading branch information
windtalker authored Mar 15, 2023
1 parent 1c181a2 commit 35e2f8d
Show file tree
Hide file tree
Showing 7 changed files with 322 additions and 31 deletions.
15 changes: 3 additions & 12 deletions dbms/src/Columns/ColumnString.h
Original file line number Diff line number Diff line change
Expand Up @@ -249,20 +249,11 @@ class ColumnString final : public COWPtrHelper<IColumn, ColumnString>
{
const size_t string_size = *reinterpret_cast<const size_t *>(pos);
pos += sizeof(string_size);

if (likely(collator))
{
// https://github.com/pingcap/tiflash/pull/6135
// - Generate empty string column
// - Make size of `offsets` as previous way for func `ColumnString::size()`
offsets.push_back(0);
return pos + string_size;
}
if (likely(collator != nullptr))
insertData(pos, string_size);
else
{
insertDataWithTerminatingZero(pos, string_size);
return pos + string_size;
}
return pos + string_size;
}

void updateHashWithValue(size_t n, SipHash & hash, const TiDB::TiDBCollatorPtr & collator, String & sort_key_container) const override
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Columns/IColumn.h
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,8 @@ class IColumn : public COWPtr<IColumn>
* 2. The input parameter `collator` does not work well for complex columns(column tuple),
* but it is only used by TiDB , which does not support complex columns, so just ignore
* the complex column will be ok.
* 3. Even if the restored column will be discarded, deserializeAndInsertFromArena still need to
* insert the data because when spill happens, this column will be used during the merge agg stage.
*/
virtual const char * deserializeAndInsertFromArena(const char * pos, const TiDB::TiDBCollatorPtr & collator) = 0;
const char * deserializeAndInsertFromArena(const char * pos) { return deserializeAndInsertFromArena(pos, nullptr); }
Expand Down
264 changes: 263 additions & 1 deletion dbms/src/Flash/tests/gtest_spill_aggregation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ namespace DB
{
namespace tests
{

class SpillAggregationTestRunner : public DB::tests::ExecutorTest
{
public:
Expand Down Expand Up @@ -96,5 +95,268 @@ try
ASSERT_COLUMNS_EQ_UR(ref_columns, vstackBlocks(std::move(blocks)).getColumnsWithTypeAndName());
}
CATCH

TEST_F(SpillAggregationTestRunner, AggWithSpecialGroupKey)
try
{
/// prepare data
size_t unique_rows = 3000;
DB::MockColumnInfoVec table_column_infos{{"key_8", TiDB::TP::TypeTiny, false}, {"key_16", TiDB::TP::TypeShort, false}, {"key_32", TiDB::TP::TypeLong, false}, {"key_64", TiDB::TP::TypeLongLong, false}, {"key_string_1", TiDB::TP::TypeString, false}, {"key_string_2", TiDB::TP::TypeString, false}, {"value", TiDB::TP::TypeLong, false}};
ColumnsWithTypeAndName table_column_data;
for (const auto & column_info : mockColumnInfosToTiDBColumnInfos(table_column_infos))
{
ColumnGeneratorOpts opts{unique_rows, getDataTypeByColumnInfoForComputingLayer(column_info)->getName(), RANDOM, column_info.name};
table_column_data.push_back(ColumnGenerator::instance().generate(opts));
}
for (auto & table_column : table_column_data)
{
if (table_column.name != "value")
table_column.column->assumeMutable()->insertRangeFrom(*table_column.column, 0, unique_rows / 2);
else
{
ColumnGeneratorOpts opts{unique_rows / 2, table_column.type->getName(), RANDOM, table_column.name};
auto column = ColumnGenerator::instance().generate(opts);
table_column.column->assumeMutable()->insertRangeFrom(*column.column, 0, unique_rows / 2);
}
}
ColumnWithTypeAndName shuffle_column = ColumnGenerator::instance().generate({unique_rows + unique_rows / 2, "UInt64", RANDOM});
IColumn::Permutation perm;
shuffle_column.column->getPermutation(false, 0, -1, perm);
for (auto & column : table_column_data)
{
column.column = column.column->permute(perm, 0);
}

context.addMockTable("test_db", "agg_table_with_special_key", table_column_infos, table_column_data);

size_t max_block_size = 800;
size_t max_bytes_before_external_agg = 100;
std::vector<size_t> concurrences{1, 8};
std::vector<Int64> collators{TiDB::ITiDBCollator::UTF8MB4_BIN, TiDB::ITiDBCollator::UTF8MB4_GENERAL_CI};
std::vector<std::vector<String>> group_by_keys{
/// fast path with one int and one string in bin collation
{"key_64", "key_string_1"},
/// fast path with two string in bin collation
{"key_string_1", "key_string_2"},
/// fast path with one string in bin collation
{"key_string_1"},
/// keys need to be shuffled
{"key_8", "key_16", "key_32", "key_64"},
};
std::vector<std::vector<ASTPtr>> agg_funcs{{Max(col("value"))}, {Max(col("value")), Min(col("value"))}};
for (auto collator_id : collators)
{
for (const auto & keys : group_by_keys)
{
for (const auto & agg_func : agg_funcs)
{
context.setCollation(collator_id);
const auto * current_collator = TiDB::ITiDBCollator::getCollator(collator_id);
ASSERT_TRUE(current_collator != nullptr);
SortDescription sd;
bool has_string_key = false;
MockAstVec key_vec;
for (const auto & key : keys)
key_vec.push_back(col(key));
auto request = context
.scan("test_db", "agg_table_with_special_key")
.aggregation(agg_func, key_vec)
.build(context);
/// use one level, no block split, no spill as the reference
context.context->setSetting("group_by_two_level_threshold_bytes", Field(static_cast<UInt64>(0)));
context.context->setSetting("max_bytes_before_external_group_by", Field(static_cast<UInt64>(0)));
context.context->setSetting("max_block_size", Field(static_cast<UInt64>(unique_rows * 2)));
/// here has to enable memory tracker otherwise the processList in the context is the last query's processList
/// and may cause segment fault, maybe a bug but should not happens in TiDB because all the tasks from tidb
/// enable memory tracker
auto reference = executeStreams(request, 1, true);
if (current_collator->isCI())
{
/// for ci collation, need to sort and compare the result manually
for (const auto & result_col : reference)
{
if (!removeNullable(result_col.type)->isString())
{
sd.push_back(SortColumnDescription(result_col.name, 1, 1, nullptr));
}
else
{
sd.push_back(SortColumnDescription(result_col.name, 1, 1, current_collator));
has_string_key = true;
}
}
/// don't run ci test if there is no string key
if (!has_string_key)
continue;
Block tmp_block(reference);
sortBlock(tmp_block, sd);
reference = tmp_block.getColumnsWithTypeAndName();
}
for (auto concurrency : concurrences)
{
context.context->setSetting("group_by_two_level_threshold", Field(static_cast<UInt64>(1)));
context.context->setSetting("group_by_two_level_threshold_bytes", Field(static_cast<UInt64>(1)));
context.context->setSetting("max_bytes_before_external_group_by", Field(static_cast<UInt64>(max_bytes_before_external_agg)));
context.context->setSetting("max_block_size", Field(static_cast<UInt64>(max_block_size)));
auto blocks = getExecuteStreamsReturnBlocks(request, concurrency, true);
for (auto & block : blocks)
{
block.checkNumberOfRows();
ASSERT(block.rows() <= max_block_size);
}
if (current_collator->isCI())
{
auto merged_block = vstackBlocks(std::move(blocks));
sortBlock(merged_block, sd);
auto merged_columns = merged_block.getColumnsWithTypeAndName();
for (size_t col_index = 0; col_index < reference.size(); col_index++)
ASSERT_TRUE(columnEqual(reference[col_index].column, merged_columns[col_index].column, sd[col_index].collator));
}
else
{
ASSERT_TRUE(columnsEqual(reference, vstackBlocks(std::move(blocks)).getColumnsWithTypeAndName(), false));
}
}
}
}
}
}
CATCH

TEST_F(SpillAggregationTestRunner, AggWithDistinctAggFunc)
try
{
/// prepare data
size_t unique_rows = 3000;
DB::MockColumnInfoVec table_column_infos{
{"key_8", TiDB::TP::TypeTiny, false},
{"key_16", TiDB::TP::TypeShort, false},
{"key_32", TiDB::TP::TypeLong, false},
{"key_64", TiDB::TP::TypeLongLong, false},
{"key_string_1", TiDB::TP::TypeString, false},
{"key_string_2", TiDB::TP::TypeString, false},
{"value_1", TiDB::TP::TypeString, false},
{"value_2", TiDB::TP::TypeLong, false},
};
size_t key_column = 6;
ColumnsWithTypeAndName table_column_data;
for (const auto & column_info : mockColumnInfosToTiDBColumnInfos(table_column_infos))
{
ColumnGeneratorOpts opts{unique_rows, getDataTypeByColumnInfoForComputingLayer(column_info)->getName(), RANDOM, column_info.name};
table_column_data.push_back(ColumnGenerator::instance().generate(opts));
}
for (size_t i = 0; i < key_column; i++)
table_column_data[i].column->assumeMutable()->insertRangeFrom(*table_column_data[i].column, 0, unique_rows / 2);
for (size_t i = key_column; i < table_column_data.size(); i++)
{
auto & table_column = table_column_data[i];
ColumnGeneratorOpts opts{unique_rows / 2, table_column.type->getName(), RANDOM, table_column.name};
auto column = ColumnGenerator::instance().generate(opts);
table_column.column->assumeMutable()->insertRangeFrom(*column.column, 0, unique_rows / 2);
}

ColumnWithTypeAndName shuffle_column = ColumnGenerator::instance().generate({unique_rows + unique_rows / 2, "UInt64", RANDOM});
IColumn::Permutation perm;
shuffle_column.column->getPermutation(false, 0, -1, perm);
for (auto & column : table_column_data)
{
column.column = column.column->permute(perm, 0);
}

context.addMockTable("test_db", "agg_table_with_special_key", table_column_infos, table_column_data);

size_t max_block_size = 800;
size_t max_bytes_before_external_agg = 100;
std::vector<size_t> concurrences{1, 8};
std::vector<Int64> collators{TiDB::ITiDBCollator::UTF8MB4_BIN, TiDB::ITiDBCollator::UTF8MB4_GENERAL_CI};
std::vector<std::vector<String>> group_by_keys{
/// fast path with one int and one string
{"key_64", "key_string_1"},
/// fast path with two string
{"key_string_1", "key_string_2"},
/// fast path with one string
{"key_string_1"},
/// keys need to be shuffled
{"key_8", "key_16", "key_32", "key_64"},
};
std::vector<std::vector<ASTPtr>> agg_funcs{{Max(col("value_1")), CountDistinct(col("value_2"))}, {CountDistinct(col("value_1")), CountDistinct(col("value_2"))}, {CountDistinct(col("value_1"))}};
for (auto collator_id : collators)
{
for (const auto & keys : group_by_keys)
{
for (const auto & agg_func : agg_funcs)
{
context.setCollation(collator_id);
const auto * current_collator = TiDB::ITiDBCollator::getCollator(collator_id);
ASSERT_TRUE(current_collator != nullptr);
SortDescription sd;
bool has_string_key = false;
MockAstVec key_vec;
for (const auto & key : keys)
key_vec.push_back(col(key));
auto request = context
.scan("test_db", "agg_table_with_special_key")
.aggregation(agg_func, key_vec)
.build(context);
/// use one level, no block split, no spill as the reference
context.context->setSetting("group_by_two_level_threshold_bytes", Field(static_cast<UInt64>(0)));
context.context->setSetting("max_bytes_before_external_group_by", Field(static_cast<UInt64>(0)));
context.context->setSetting("max_block_size", Field(static_cast<UInt64>(unique_rows * 2)));
/// here has to enable memory tracker otherwise the processList in the context is the last query's processList
/// and may cause segment fault, maybe a bug but should not happens in TiDB because all the tasks from tidb
/// enable memory tracker
auto reference = executeStreams(request, 1, true);
if (current_collator->isCI())
{
/// for ci collation, need to sort and compare the result manually
for (const auto & result_col : reference)
{
if (!removeNullable(result_col.type)->isString())
{
sd.push_back(SortColumnDescription(result_col.name, 1, 1, nullptr));
}
else
{
sd.push_back(SortColumnDescription(result_col.name, 1, 1, current_collator));
has_string_key = true;
}
}
/// don't run ci test if there is no string key
if (!has_string_key)
continue;
Block tmp_block(reference);
sortBlock(tmp_block, sd);
reference = tmp_block.getColumnsWithTypeAndName();
}
for (auto concurrency : concurrences)
{
context.context->setSetting("group_by_two_level_threshold", Field(static_cast<UInt64>(1)));
context.context->setSetting("group_by_two_level_threshold_bytes", Field(static_cast<UInt64>(1)));
context.context->setSetting("max_bytes_before_external_group_by", Field(static_cast<UInt64>(max_bytes_before_external_agg)));
context.context->setSetting("max_block_size", Field(static_cast<UInt64>(max_block_size)));
auto blocks = getExecuteStreamsReturnBlocks(request, concurrency, true);
for (auto & block : blocks)
{
block.checkNumberOfRows();
ASSERT(block.rows() <= max_block_size);
}
if (current_collator->isCI())
{
auto merged_block = vstackBlocks(std::move(blocks));
sortBlock(merged_block, sd);
auto merged_columns = merged_block.getColumnsWithTypeAndName();
for (size_t col_index = 0; col_index < reference.size(); col_index++)
ASSERT_TRUE(columnEqual(reference[col_index].column, merged_columns[col_index].column, sd[col_index].collator));
}
else
{
ASSERT_TRUE(columnsEqual(reference, vstackBlocks(std::move(blocks)).getColumnsWithTypeAndName(), false));
}
}
}
}
}
}
CATCH
} // namespace tests
} // namespace DB
44 changes: 43 additions & 1 deletion dbms/src/Flash/tests/gtest_spill_sort.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ namespace DB
{
namespace tests
{

class SpillSortTestRunner : public DB::tests::ExecutorTest
{
public:
Expand Down Expand Up @@ -70,5 +69,48 @@ try
ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreams(request, original_max_streams));
}
CATCH

TEST_F(SpillSortTestRunner, CollatorTest)
try
{
DB::MockColumnInfoVec column_infos{{"a", TiDB::TP::TypeString, false}, {"b", TiDB::TP::TypeString, false}, {"c", TiDB::TP::TypeString, false}, {"d", TiDB::TP::TypeString, false}, {"e", TiDB::TP::TypeString, false}};
ColumnsWithTypeAndName column_data;
size_t table_rows = 102400;
UInt64 max_block_size = 500;
size_t original_max_streams = 20;
size_t total_data_size = 0;
size_t limit_size = table_rows / 10 * 8;
for (const auto & column_info : mockColumnInfosToTiDBColumnInfos(column_infos))
{
ColumnGeneratorOpts opts{table_rows, getDataTypeByColumnInfoForComputingLayer(column_info)->getName(), RANDOM, column_info.name, 5};
column_data.push_back(ColumnGenerator::instance().generate(opts));
total_data_size += column_data.back().column->byteSize();
}
context.addMockTable("spill_sort_test", "collation_table", column_infos, column_data, 8);

MockOrderByItemVec order_by_items{std::make_pair("a", true), std::make_pair("b", true), std::make_pair("c", true), std::make_pair("d", true), std::make_pair("e", true)};
std::vector<Int64> collators{TiDB::ITiDBCollator::UTF8MB4_BIN, TiDB::ITiDBCollator::UTF8MB4_GENERAL_CI, TiDB::ITiDBCollator::UTF8MB4_UNICODE_CI};
for (const auto & collator_id : collators)
{
context.setCollation(collator_id);
auto request = context
.scan("spill_sort_test", "collation_table")
.topN(order_by_items, limit_size)
.build(context);
context.context->setSetting("max_block_size", Field(static_cast<UInt64>(max_block_size)));
/// disable spill
context.context->setSetting("max_bytes_before_external_sort", Field(static_cast<UInt64>(0)));
auto ref_columns = executeStreams(request, original_max_streams);
/// enable spill
context.context->setSetting("max_bytes_before_external_sort", Field(static_cast<UInt64>(total_data_size / 10)));
// don't use `executeAndAssertColumnsEqual` since it takes too long to run
/// todo use ASSERT_COLUMNS_EQ_R once TiFlash support final TopN
ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreams(request, original_max_streams));
/// enable spill and use small max_cached_data_bytes_in_spiller
context.context->setSetting("max_cached_data_bytes_in_spiller", Field(static_cast<UInt64>(total_data_size / 100)));
ASSERT_COLUMNS_EQ_UR(ref_columns, executeStreams(request, original_max_streams));
}
}
CATCH
} // namespace tests
} // namespace DB
3 changes: 2 additions & 1 deletion dbms/src/Interpreters/Aggregator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1766,7 +1766,8 @@ void NO_INLINE Aggregator::mergeStreamsImplCase(
std::vector<std::string> sort_key_containers;
sort_key_containers.resize(params.keys_size, "");

typename Method::State state(key_columns, key_sizes, params.collators);
/// in merge stage, don't need to care about the collator because the key is already the sort_key of original string
typename Method::State state(key_columns, key_sizes, {});

/// For all rows.
size_t rows = block.rows();
Expand Down
Loading

0 comments on commit 35e2f8d

Please sign in to comment.