Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support scan concurrency hint for mock table #6591

Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 35 additions & 11 deletions dbms/src/Debug/MockStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ void MockStorage::addTableData(const String & name, ColumnsWithTypeAndName & col
table_columns[getTableId(name)] = columns;
}

void MockStorage::addTableScanConcurrencyHint(const String & name, size_t concurrency_hint)
{
table_scan_concurrency_hint[getTableId(name)] = concurrency_hint;
}

Int64 MockStorage::getTableId(const String & name)
{
if (name_to_id_map.find(name) != name_to_id_map.end())
Expand All @@ -67,6 +72,15 @@ ColumnsWithTypeAndName MockStorage::getColumns(Int64 table_id)
throw Exception(fmt::format("Failed to get columns by table_id '{}'", table_id));
}

size_t MockStorage::getScanConcurrencyHint(Int64 table_id)
{
if (tableExists(table_id))
{
return table_scan_concurrency_hint[table_id];
}
return 0;
}

MockColumnInfoVec MockStorage::getTableSchema(const String & name)
{
if (tableExists(getTableId(name)))
Expand Down Expand Up @@ -366,17 +380,8 @@ void MockStorage::addTableInfo(const String & name, const MockColumnInfoVec & co
TableInfo table_info;
table_info.name = name;
table_info.id = getTableId(name);
int i = 0;
for (const auto & column : columns)
{
TiDB::ColumnInfo ret;
std::tie(ret.name, ret.tp) = column;
// TODO: find a way to assign decimal field's flen.
if (ret.tp == TiDB::TP::TypeNewDecimal)
ret.flen = 65;
ret.id = i++;
table_info.columns.push_back(std::move(ret));
}
auto column_infos = mockColumnInfosToTiDBColumnInfos(columns);
table_info.columns.swap(column_infos);
table_infos[name] = table_info;
}

Expand All @@ -389,4 +394,23 @@ TableInfo MockStorage::getTableInfoForDeltaMerge(const String & name)
{
return table_infos_for_delta_merge[name];
}

ColumnInfos mockColumnInfosToTiDBColumnInfos(const MockColumnInfoVec & mock_column_infos)
{
ColumnID col_id = 0;
ColumnInfos ret;
ret.reserve(mock_column_infos.size());
for (const auto & mock_column_info : mock_column_infos)
{
TiDB::ColumnInfo column_info;
std::tie(column_info.name, column_info.tp) = mock_column_info;
column_info.id = col_id++;
// TODO: find a way to assign decimal field's flen.
if (column_info.tp == TiDB::TP::TypeNewDecimal)
column_info.flen = 65;
ret.push_back(std::move(column_info));
}
return ret;
}

} // namespace DB
9 changes: 9 additions & 0 deletions dbms/src/Debug/MockStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ class MockTableIdGenerator : public ext::Singleton<MockTableIdGenerator>
std::atomic<Int64> current_id = 0;
};

ColumnInfos mockColumnInfosToTiDBColumnInfos(const MockColumnInfoVec & mock_column_infos);

/** Responsible for mock data for executor tests and mpp tests.
* 1. Use this class to add mock table schema and table column data.
* 2. Use this class to add mock exchange schema and exchange column data.
Expand All @@ -58,10 +60,14 @@ class MockStorage

void addTableData(const String & name, ColumnsWithTypeAndName & columns);

void addTableScanConcurrencyHint(const String & name, size_t concurrency_hint);

MockColumnInfoVec getTableSchema(const String & name);

ColumnsWithTypeAndName getColumns(Int64 table_id);

size_t getScanConcurrencyHint(Int64 table_id);

bool tableExists(Int64 table_id);

/// for storage delta merge table scan
Expand Down Expand Up @@ -98,6 +104,8 @@ class MockStorage
TableInfo getTableInfo(const String & name);
TableInfo getTableInfoForDeltaMerge(const String & name);

size_t getTableScanConcurrencyHint(const TiDBTableScan & table_scan);

/// clear for StorageDeltaMerge
void clear();

Expand All @@ -111,6 +119,7 @@ class MockStorage
std::unordered_map<Int64, MockColumnInfoVec> table_schema; /// <table_id, columnInfo>
std::unordered_map<Int64, ColumnsWithTypeAndName> table_columns; /// <table_id, columns>
std::unordered_map<String, TableInfo> table_infos; /// <table_name, table_info>
std::unordered_map<Int64, size_t> table_scan_concurrency_hint; /// <table_id, concurrency_hint>

/// for mock exchange receiver
std::unordered_map<String, String> executor_id_to_name_map; /// <executor_id, exchange name>
Expand Down
21 changes: 6 additions & 15 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,19 +164,7 @@ AnalysisResult analyzeExpressions(
// for tests, we need to mock tableScan blockInputStream as the source stream.
void DAGQueryBlockInterpreter::handleMockTableScan(const TiDBTableScan & table_scan, DAGPipeline & pipeline)
{
// Interpreter test will not use columns in MockStorage
if (context.isInterpreterTest())
{
auto names_and_types = genNamesAndTypes(table_scan, "mock_table_scan");
auto columns_with_type_and_name = getColumnWithTypeAndName(names_and_types);
analyzer = std::make_unique<DAGExpressionAnalyzer>(std::move(names_and_types), context);
for (size_t i = 0; i < max_streams; ++i)
{
auto mock_table_scan_stream = std::make_shared<MockTableScanBlockInputStream>(columns_with_type_and_name, context.getSettingsRef().max_block_size);
pipeline.streams.emplace_back(mock_table_scan_stream);
}
}
else if (context.mockStorage()->useDeltaMerge())
if (context.mockStorage()->useDeltaMerge())
{
assert(context.mockStorage()->tableExistsForDeltaMerge(table_scan.getLogicalTableID()));
auto names_and_types = context.mockStorage()->getNameAndTypesForDeltaMerge(table_scan.getLogicalTableID());
Expand All @@ -187,16 +175,19 @@ void DAGQueryBlockInterpreter::handleMockTableScan(const TiDBTableScan & table_s
else
{
/// build from user input blocks.
size_t scan_concurrency = max_streams;
size_t concurrency_hint = context.mockStorage()->getScanConcurrencyHint(table_scan.getLogicalTableID());
scan_concurrency = concurrency_hint == 0 ? scan_concurrency : std::min(scan_concurrency, concurrency_hint);
assert(context.mockStorage()->tableExists(table_scan.getLogicalTableID()));
NamesAndTypes names_and_types;
std::vector<std::shared_ptr<DB::MockTableScanBlockInputStream>> mock_table_scan_streams;
if (context.isMPPTest())
{
std::tie(names_and_types, mock_table_scan_streams) = mockSourceStreamForMpp(context, max_streams, log, table_scan);
std::tie(names_and_types, mock_table_scan_streams) = mockSourceStreamForMpp(context, scan_concurrency, log, table_scan);
}
else
{
std::tie(names_and_types, mock_table_scan_streams) = mockSourceStream<MockTableScanBlockInputStream>(context, max_streams, log, table_scan.getTableScanExecutorID(), table_scan.getLogicalTableID());
std::tie(names_and_types, mock_table_scan_streams) = mockSourceStream<MockTableScanBlockInputStream>(context, scan_concurrency, log, table_scan.getTableScanExecutorID(), table_scan.getLogicalTableID());
}

analyzer = std::make_unique<DAGExpressionAnalyzer>(std::move(names_and_types), context);
Expand Down
14 changes: 9 additions & 5 deletions dbms/src/Flash/Coprocessor/GenSchemaAndColumn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,13 @@ String genNameForExchangeReceiver(Int32 col_index)
return "exchange_receiver_" + std::to_string(col_index);
}

NamesAndTypes genNamesAndTypes(const TiDBTableScan & table_scan, const StringRef & column_prefix)
NamesAndTypes genNamesAndTypes(const ColumnInfos & column_infos, const StringRef & column_prefix)
{
NamesAndTypes names_and_types;
names_and_types.reserve(table_scan.getColumnSize());
for (Int32 i = 0; i < table_scan.getColumnSize(); ++i)
names_and_types.reserve(column_infos.size());
for (size_t i = 0; i < column_infos.size(); ++i)
{
const auto column_info = table_scan.getColumns()[i];
const auto & column_info = column_infos[i];
switch (column_info.id)
{
case TiDBPkColumnID:
Expand All @@ -72,11 +72,15 @@ NamesAndTypes genNamesAndTypes(const TiDBTableScan & table_scan, const StringRef
names_and_types.emplace_back(MutableSupport::extra_table_id_column_name, MutableSupport::extra_table_id_column_type);
break;
default:
names_and_types.emplace_back(fmt::format("{}_{}", column_prefix, i), getDataTypeByColumnInfoForComputingLayer(column_info));
names_and_types.emplace_back(column_info.name.empty() ? fmt::format("{}_{}", column_prefix, i) : column_info.name, getDataTypeByColumnInfoForComputingLayer(column_info));
}
}
return names_and_types;
}
NamesAndTypes genNamesAndTypes(const TiDBTableScan & table_scan, const StringRef & column_prefix)
{
return genNamesAndTypes(table_scan.getColumns(), column_prefix);
}

ColumnsWithTypeAndName getColumnWithTypeAndName(const NamesAndTypes & names_and_types)
{
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Flash/Coprocessor/GenSchemaAndColumn.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ NamesAndTypes genNamesAndTypesForTableScan(const TiDBTableScan & table_scan);
String genNameForExchangeReceiver(Int32 col_index);

NamesAndTypes genNamesAndTypes(const TiDBTableScan & table_scan, const StringRef & column_prefix);
NamesAndTypes genNamesAndTypes(const ColumnInfos & column_infos, const StringRef & column_prefix);
ColumnsWithTypeAndName getColumnWithTypeAndName(const NamesAndTypes & names_and_types);
NamesAndTypes toNamesAndTypes(const DAGSchema & dag_schema);
} // namespace DB
12 changes: 3 additions & 9 deletions dbms/src/Flash/Planner/plans/PhysicalMockTableScan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,11 @@ std::pair<NamesAndTypes, BlockInputStreams> mockSchemaAndStreams(
BlockInputStreams mock_streams;
auto & dag_context = *context.getDAGContext();
size_t max_streams = dag_context.initialize_concurrency;
size_t concurrency_hint = context.mockStorage()->getScanConcurrencyHint(table_scan.getLogicalTableID());
max_streams = concurrency_hint == 0 ? max_streams : std::min(max_streams, concurrency_hint);
assert(max_streams > 0);

// Interpreter test will not use columns in MockStorage
if (context.isInterpreterTest())
{
schema = genNamesAndTypes(table_scan, "mock_table_scan");
auto columns_with_type_and_name = getColumnWithTypeAndName(schema);
for (size_t i = 0; i < max_streams; ++i)
mock_streams.emplace_back(std::make_shared<MockTableScanBlockInputStream>(columns_with_type_and_name, context.getSettingsRef().max_block_size));
}
else if (context.mockStorage()->useDeltaMerge())
if (context.mockStorage()->useDeltaMerge())
{
assert(context.mockStorage()->tableExistsForDeltaMerge(table_scan.getLogicalTableID()));
schema = context.mockStorage()->getNameAndTypesForDeltaMerge(table_scan.getLogicalTableID());
Expand Down
81 changes: 81 additions & 0 deletions dbms/src/Flash/tests/gtest_scan_concurrency_hint.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <TestUtils/ExecutorTestUtils.h>

#include <ext/enumerate.h>
#include <tuple>

namespace DB
{
namespace tests
{
class ScanConcurrencyHintTest : public DB::tests::ExecutorTest
{
public:
void initializeContext() override
{
ExecutorTest::initializeContext();
}
};

TEST_F(ScanConcurrencyHintTest, InvalidHint)
try
{
context.addMockTable("simple_test", "t1", {{"a", TiDB::TP::TypeString}, {"b", TiDB::TP::TypeString}}, {toNullableVec<String>("a", {"1", "2", {}, "1", {}}), toNullableVec<String>("b", {"3", "4", "3", {}, {}})}, 0);
auto request = context.scan("simple_test", "t1").build(context);
{
/// the scan concurrency hint is invalid, the final stream concurrency is the original concurrency
String expected = R"(
Union: <for test>
Expression x 10: <final projection>
MockTableScan)";
ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10);
}
}
CATCH

TEST_F(ScanConcurrencyHintTest, ValidHint)
try
{
context.addMockTable("simple_test", "t1", {{"a", TiDB::TP::TypeString}, {"b", TiDB::TP::TypeString}}, {toNullableVec<String>("a", {"1", "2", {}, "1", {}}), toNullableVec<String>("b", {"3", "4", "3", {}, {}})}, 3);
auto request = context.scan("simple_test", "t1").build(context);
{
/// case 1, concurrency < scan concurrency hint, the final stream concurrency is the original concurrency
String expected = R"(
Expression: <final projection>
MockTableScan)";
ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 1);
}
{
/// case 2, concurrency = scan concurrency hint, the final stream concurrency is the original concurrency
String expected = R"(
Union: <for test>
Expression x 3: <final projection>
MockTableScan)";
ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 3);
}
{
/// case 3, concurrency > scan concurrency hint, the final stream concurrency is the scan concurrency hint
String expected = R"(
Union: <for test>
Expression x 3: <final projection>
MockTableScan)";
ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10);
}
}
CATCH

} // namespace tests
} // namespace DB
1 change: 1 addition & 0 deletions dbms/src/TestUtils/ExecutorTestUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ void ExecutorTest::executeInterpreter(const String & expected_string, const std:
DAGContext dag_context(*request, "interpreter_test", concurrency);
context.context.setDAGContext(&dag_context);
context.context.setInterpreterTest();
context.context.setMockStorage(context.mockStorage());

// Don't care regions information in interpreter tests.
auto query_executor = queryExecute(context.context, /*internal=*/true);
Expand Down
37 changes: 30 additions & 7 deletions dbms/src/TestUtils/mockExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -360,16 +360,37 @@ DAGRequestBuilder & DAGRequestBuilder::sort(MockOrderByItemVec order_by_vec, boo
return *this;
}

void MockDAGRequestContext::addMockTable(const String & db, const String & table, const MockColumnInfoVec & columnInfos)
void MockDAGRequestContext::addMockTable(const String & db, const String & table, const MockColumnInfoVec & mock_column_infos)
{
mock_storage->addTableSchema(db + "." + table, columnInfos);
auto columns = getColumnWithTypeAndName(genNamesAndTypes(mockColumnInfosToTiDBColumnInfos(mock_column_infos), "mock_table_scan"));
addMockTable(db, table, mock_column_infos, columns);
}

void MockDAGRequestContext::addMockTable(const MockTableName & name, const MockColumnInfoVec & columnInfos)
void MockDAGRequestContext::addMockTableSchema(const String & db, const String & table, const MockColumnInfoVec & columnInfos)
{
mock_storage->addTableSchema(db + "." + table, columnInfos);
}
void MockDAGRequestContext::addMockTableSchema(const MockTableName & name, const MockColumnInfoVec & columnInfos)
{
mock_storage->addTableSchema(name.first + "." + name.second, columnInfos);
}

void MockDAGRequestContext::addMockTable(const MockTableName & name, const MockColumnInfoVec & mock_column_infos)
{
auto columns = getColumnWithTypeAndName(genNamesAndTypes(mockColumnInfosToTiDBColumnInfos(mock_column_infos), "mock_table_scan"));
addMockTable(name, mock_column_infos, columns);
}

void MockDAGRequestContext::addMockTableConcurrencyHint(const String & db, const String & table, size_t concurrency_hint)
{
mock_storage->addTableScanConcurrencyHint(db + "." + table, concurrency_hint);
}

void MockDAGRequestContext::addMockTableConcurrencyHint(const MockTableName & name, size_t concurrency_hint)
{
mock_storage->addTableScanConcurrencyHint(name.first + "." + name.second, concurrency_hint);
}

void MockDAGRequestContext::addExchangeRelationSchema(String name, const MockColumnInfoVec & columnInfos)
{
mock_storage->addExchangeSchema(name, columnInfos);
Expand Down Expand Up @@ -398,20 +419,22 @@ void MockDAGRequestContext::addExchangeReceiverColumnData(const String & name, C
mock_storage->addExchangeData(name, columns);
}

void MockDAGRequestContext::addMockTable(const String & db, const String & table, const MockColumnInfoVec & columnInfos, ColumnsWithTypeAndName columns)
void MockDAGRequestContext::addMockTable(const String & db, const String & table, const MockColumnInfoVec & columnInfos, ColumnsWithTypeAndName columns, size_t concurrency_hint)
{
assertMockInput(columnInfos, columns);

addMockTable(db, table, columnInfos);
addMockTableSchema(db, table, columnInfos);
addMockTableColumnData(db, table, columns);
addMockTableConcurrencyHint(db, table, concurrency_hint);
}

void MockDAGRequestContext::addMockTable(const MockTableName & name, const MockColumnInfoVec & columnInfos, ColumnsWithTypeAndName columns)
void MockDAGRequestContext::addMockTable(const MockTableName & name, const MockColumnInfoVec & columnInfos, ColumnsWithTypeAndName columns, size_t concurrency_hint)
{
assertMockInput(columnInfos, columns);

addMockTable(name, columnInfos);
addMockTableSchema(name, columnInfos);
addMockTableColumnData(name, columns);
addMockTableConcurrencyHint(name, concurrency_hint);
}

void MockDAGRequestContext::addMockDeltaMergeSchema(const String & db, const String & table, const MockColumnInfoVec & columnInfos)
Expand Down
8 changes: 6 additions & 2 deletions dbms/src/TestUtils/mockExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,13 @@ class MockDAGRequestContext
void addMockTable(const MockTableName & name, const MockColumnInfoVec & columnInfos);
void addMockTableColumnData(const String & db, const String & table, ColumnsWithTypeAndName columns);
void addMockTableColumnData(const MockTableName & name, ColumnsWithTypeAndName columns);
void addMockTableSchema(const String & db, const String & table, const MockColumnInfoVec & columnInfos);
void addMockTableSchema(const MockTableName & name, const MockColumnInfoVec & columnInfos);
void addMockTableConcurrencyHint(const String & db, const String & table, size_t concurrency_hint);
void addMockTableConcurrencyHint(const MockTableName & name, size_t concurrency_hint);

void addMockTable(const String & db, const String & table, const MockColumnInfoVec & columnInfos, ColumnsWithTypeAndName columns);
void addMockTable(const MockTableName & name, const MockColumnInfoVec & columnInfos, ColumnsWithTypeAndName columns);
void addMockTable(const String & db, const String & table, const MockColumnInfoVec & columnInfos, ColumnsWithTypeAndName columns, size_t concurrency_hint = 0);
void addMockTable(const MockTableName & name, const MockColumnInfoVec & columnInfos, ColumnsWithTypeAndName columns, size_t concurrency_hint = 0);

/// mock DeltaMerge table scan
void addMockDeltaMerge(const MockTableName & name, const MockColumnInfoVec & columnInfos, ColumnsWithTypeAndName columns);
Expand Down