Skip to content

Commit

Permalink
support scan concurrency hint for mock table (#6591)
Browse files Browse the repository at this point in the history
ref #6528
  • Loading branch information
windtalker authored Jan 6, 2023
1 parent 3350580 commit 20c5a07
Show file tree
Hide file tree
Showing 12 changed files with 186 additions and 50 deletions.
46 changes: 35 additions & 11 deletions dbms/src/Debug/MockStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ void MockStorage::addTableData(const String & name, ColumnsWithTypeAndName & col
table_columns[getTableId(name)] = columns;
}

void MockStorage::addTableScanConcurrencyHint(const String & name, size_t concurrency_hint)
{
table_scan_concurrency_hint[getTableId(name)] = concurrency_hint;
}

Int64 MockStorage::getTableId(const String & name)
{
if (name_to_id_map.find(name) != name_to_id_map.end())
Expand All @@ -67,6 +72,15 @@ ColumnsWithTypeAndName MockStorage::getColumns(Int64 table_id)
throw Exception(fmt::format("Failed to get columns by table_id '{}'", table_id));
}

size_t MockStorage::getScanConcurrencyHint(Int64 table_id)
{
if (tableExists(table_id))
{
return table_scan_concurrency_hint[table_id];
}
return 0;
}

MockColumnInfoVec MockStorage::getTableSchema(const String & name)
{
if (tableExists(getTableId(name)))
Expand Down Expand Up @@ -366,17 +380,8 @@ void MockStorage::addTableInfo(const String & name, const MockColumnInfoVec & co
TableInfo table_info;
table_info.name = name;
table_info.id = getTableId(name);
int i = 0;
for (const auto & column : columns)
{
TiDB::ColumnInfo ret;
std::tie(ret.name, ret.tp) = column;
// TODO: find a way to assign decimal field's flen.
if (ret.tp == TiDB::TP::TypeNewDecimal)
ret.flen = 65;
ret.id = i++;
table_info.columns.push_back(std::move(ret));
}
auto column_infos = mockColumnInfosToTiDBColumnInfos(columns);
table_info.columns.swap(column_infos);
table_infos[name] = table_info;
}

Expand All @@ -389,4 +394,23 @@ TableInfo MockStorage::getTableInfoForDeltaMerge(const String & name)
{
return table_infos_for_delta_merge[name];
}

ColumnInfos mockColumnInfosToTiDBColumnInfos(const MockColumnInfoVec & mock_column_infos)
{
ColumnID col_id = 0;
ColumnInfos ret;
ret.reserve(mock_column_infos.size());
for (const auto & mock_column_info : mock_column_infos)
{
TiDB::ColumnInfo column_info;
std::tie(column_info.name, column_info.tp) = mock_column_info;
column_info.id = col_id++;
// TODO: find a way to assign decimal field's flen.
if (column_info.tp == TiDB::TP::TypeNewDecimal)
column_info.flen = 65;
ret.push_back(std::move(column_info));
}
return ret;
}

} // namespace DB
9 changes: 9 additions & 0 deletions dbms/src/Debug/MockStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ class MockTableIdGenerator : public ext::Singleton<MockTableIdGenerator>
std::atomic<Int64> current_id = 0;
};

ColumnInfos mockColumnInfosToTiDBColumnInfos(const MockColumnInfoVec & mock_column_infos);

/** Responsible for mock data for executor tests and mpp tests.
* 1. Use this class to add mock table schema and table column data.
* 2. Use this class to add mock exchange schema and exchange column data.
Expand All @@ -58,10 +60,14 @@ class MockStorage

void addTableData(const String & name, ColumnsWithTypeAndName & columns);

void addTableScanConcurrencyHint(const String & name, size_t concurrency_hint);

MockColumnInfoVec getTableSchema(const String & name);

ColumnsWithTypeAndName getColumns(Int64 table_id);

size_t getScanConcurrencyHint(Int64 table_id);

bool tableExists(Int64 table_id);

/// for storage delta merge table scan
Expand Down Expand Up @@ -98,6 +104,8 @@ class MockStorage
TableInfo getTableInfo(const String & name);
TableInfo getTableInfoForDeltaMerge(const String & name);

size_t getTableScanConcurrencyHint(const TiDBTableScan & table_scan);

/// clear for StorageDeltaMerge
void clear();

Expand All @@ -111,6 +119,7 @@ class MockStorage
std::unordered_map<Int64, MockColumnInfoVec> table_schema; /// <table_id, columnInfo>
std::unordered_map<Int64, ColumnsWithTypeAndName> table_columns; /// <table_id, columns>
std::unordered_map<String, TableInfo> table_infos; /// <table_name, table_info>
std::unordered_map<Int64, size_t> table_scan_concurrency_hint; /// <table_id, concurrency_hint>

/// for mock exchange receiver
std::unordered_map<String, String> executor_id_to_name_map; /// <executor_id, exchange name>
Expand Down
19 changes: 4 additions & 15 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,19 +164,7 @@ AnalysisResult analyzeExpressions(
// for tests, we need to mock tableScan blockInputStream as the source stream.
void DAGQueryBlockInterpreter::handleMockTableScan(const TiDBTableScan & table_scan, DAGPipeline & pipeline)
{
// Interpreter test will not use columns in MockStorage
if (context.isInterpreterTest())
{
auto names_and_types = genNamesAndTypes(table_scan, "mock_table_scan");
auto columns_with_type_and_name = getColumnWithTypeAndName(names_and_types);
analyzer = std::make_unique<DAGExpressionAnalyzer>(std::move(names_and_types), context);
for (size_t i = 0; i < max_streams; ++i)
{
auto mock_table_scan_stream = std::make_shared<MockTableScanBlockInputStream>(columns_with_type_and_name, context.getSettingsRef().max_block_size);
pipeline.streams.emplace_back(mock_table_scan_stream);
}
}
else if (context.mockStorage()->useDeltaMerge())
if (context.mockStorage()->useDeltaMerge())
{
assert(context.mockStorage()->tableExistsForDeltaMerge(table_scan.getLogicalTableID()));
auto names_and_types = context.mockStorage()->getNameAndTypesForDeltaMerge(table_scan.getLogicalTableID());
Expand All @@ -187,16 +175,17 @@ void DAGQueryBlockInterpreter::handleMockTableScan(const TiDBTableScan & table_s
else
{
/// build from user input blocks.
size_t scan_concurrency = getMockSourceStreamConcurrency(max_streams, context.mockStorage()->getScanConcurrencyHint(table_scan.getLogicalTableID()));
assert(context.mockStorage()->tableExists(table_scan.getLogicalTableID()));
NamesAndTypes names_and_types;
std::vector<std::shared_ptr<DB::MockTableScanBlockInputStream>> mock_table_scan_streams;
if (context.isMPPTest())
{
std::tie(names_and_types, mock_table_scan_streams) = mockSourceStreamForMpp(context, max_streams, log, table_scan);
std::tie(names_and_types, mock_table_scan_streams) = mockSourceStreamForMpp(context, scan_concurrency, log, table_scan);
}
else
{
std::tie(names_and_types, mock_table_scan_streams) = mockSourceStream<MockTableScanBlockInputStream>(context, max_streams, log, table_scan.getTableScanExecutorID(), table_scan.getLogicalTableID());
std::tie(names_and_types, mock_table_scan_streams) = mockSourceStream<MockTableScanBlockInputStream>(context, scan_concurrency, log, table_scan.getTableScanExecutorID(), table_scan.getLogicalTableID());
}

analyzer = std::make_unique<DAGExpressionAnalyzer>(std::move(names_and_types), context);
Expand Down
14 changes: 9 additions & 5 deletions dbms/src/Flash/Coprocessor/GenSchemaAndColumn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,13 @@ String genNameForExchangeReceiver(Int32 col_index)
return "exchange_receiver_" + std::to_string(col_index);
}

NamesAndTypes genNamesAndTypes(const TiDBTableScan & table_scan, const StringRef & column_prefix)
NamesAndTypes genNamesAndTypes(const ColumnInfos & column_infos, const StringRef & column_prefix)
{
NamesAndTypes names_and_types;
names_and_types.reserve(table_scan.getColumnSize());
for (Int32 i = 0; i < table_scan.getColumnSize(); ++i)
names_and_types.reserve(column_infos.size());
for (size_t i = 0; i < column_infos.size(); ++i)
{
const auto column_info = table_scan.getColumns()[i];
const auto & column_info = column_infos[i];
switch (column_info.id)
{
case TiDBPkColumnID:
Expand All @@ -72,11 +72,15 @@ NamesAndTypes genNamesAndTypes(const TiDBTableScan & table_scan, const StringRef
names_and_types.emplace_back(MutableSupport::extra_table_id_column_name, MutableSupport::extra_table_id_column_type);
break;
default:
names_and_types.emplace_back(fmt::format("{}_{}", column_prefix, i), getDataTypeByColumnInfoForComputingLayer(column_info));
names_and_types.emplace_back(column_info.name.empty() ? fmt::format("{}_{}", column_prefix, i) : column_info.name, getDataTypeByColumnInfoForComputingLayer(column_info));
}
}
return names_and_types;
}
NamesAndTypes genNamesAndTypes(const TiDBTableScan & table_scan, const StringRef & column_prefix)
{
return genNamesAndTypes(table_scan.getColumns(), column_prefix);
}

ColumnsWithTypeAndName getColumnWithTypeAndName(const NamesAndTypes & names_and_types)
{
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Flash/Coprocessor/GenSchemaAndColumn.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ NamesAndTypes genNamesAndTypesForTableScan(const TiDBTableScan & table_scan);
String genNameForExchangeReceiver(Int32 col_index);

NamesAndTypes genNamesAndTypes(const TiDBTableScan & table_scan, const StringRef & column_prefix);
NamesAndTypes genNamesAndTypes(const ColumnInfos & column_infos, const StringRef & column_prefix);
ColumnsWithTypeAndName getColumnWithTypeAndName(const NamesAndTypes & names_and_types);
NamesAndTypes toNamesAndTypes(const DAGSchema & dag_schema);
} // namespace DB
6 changes: 6 additions & 0 deletions dbms/src/Flash/Coprocessor/MockSourceStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,10 @@ std::pair<NamesAndTypes, std::vector<std::shared_ptr<MockTableScanBlockInputStre
ColumnsWithTypeAndName columns_with_type_and_name = context.mockStorage()->getColumnsForMPPTableScan(table_scan, context.mockMPPServerInfo().partition_id, context.mockMPPServerInfo().partition_num);
return cutStreams<MockTableScanBlockInputStream>(context, columns_with_type_and_name, max_streams, log);
}
size_t getMockSourceStreamConcurrency(size_t max_streams, size_t scan_concurrency_hint)
{
if (scan_concurrency_hint == 0)
return max_streams;
return std::max(std::min(max_streams, scan_concurrency_hint), 1);
}
} // namespace DB
2 changes: 2 additions & 0 deletions dbms/src/Flash/Coprocessor/MockSourceStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ std::pair<NamesAndTypes, std::vector<std::shared_ptr<SourceType>>> cutStreams(Co

std::pair<NamesAndTypes, std::vector<std::shared_ptr<MockTableScanBlockInputStream>>> mockSourceStreamForMpp(Context & context, size_t max_streams, DB::LoggerPtr log, const TiDBTableScan & table_scan);

size_t getMockSourceStreamConcurrency(size_t max_streams, size_t scan_concurrency_hint);

template <typename SourceType>
std::pair<NamesAndTypes, std::vector<std::shared_ptr<SourceType>>> mockSourceStream(Context & context, size_t max_streams, DB::LoggerPtr log, String executor_id, Int64 table_id = 0)
{
Expand Down
12 changes: 2 additions & 10 deletions dbms/src/Flash/Planner/plans/PhysicalMockTableScan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,10 @@ std::pair<NamesAndTypes, BlockInputStreams> mockSchemaAndStreams(
NamesAndTypes schema;
BlockInputStreams mock_streams;
auto & dag_context = *context.getDAGContext();
size_t max_streams = dag_context.initialize_concurrency;
size_t max_streams = getMockSourceStreamConcurrency(dag_context.initialize_concurrency, context.mockStorage()->getScanConcurrencyHint(table_scan.getLogicalTableID()));
assert(max_streams > 0);

// Interpreter test will not use columns in MockStorage
if (context.isInterpreterTest())
{
schema = genNamesAndTypes(table_scan, "mock_table_scan");
auto columns_with_type_and_name = getColumnWithTypeAndName(schema);
for (size_t i = 0; i < max_streams; ++i)
mock_streams.emplace_back(std::make_shared<MockTableScanBlockInputStream>(columns_with_type_and_name, context.getSettingsRef().max_block_size));
}
else if (context.mockStorage()->useDeltaMerge())
if (context.mockStorage()->useDeltaMerge())
{
assert(context.mockStorage()->tableExistsForDeltaMerge(table_scan.getLogicalTableID()));
schema = context.mockStorage()->getNameAndTypesForDeltaMerge(table_scan.getLogicalTableID());
Expand Down
81 changes: 81 additions & 0 deletions dbms/src/Flash/tests/gtest_scan_concurrency_hint.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <TestUtils/ExecutorTestUtils.h>

#include <ext/enumerate.h>
#include <tuple>

namespace DB
{
namespace tests
{
class ScanConcurrencyHintTest : public DB::tests::ExecutorTest
{
public:
void initializeContext() override
{
ExecutorTest::initializeContext();
}
};

TEST_F(ScanConcurrencyHintTest, InvalidHint)
try
{
context.addMockTable("simple_test", "t1", {{"a", TiDB::TP::TypeString}, {"b", TiDB::TP::TypeString}}, {toNullableVec<String>("a", {"1", "2", {}, "1", {}}), toNullableVec<String>("b", {"3", "4", "3", {}, {}})}, 0);
auto request = context.scan("simple_test", "t1").build(context);
{
/// the scan concurrency hint is invalid, the final stream concurrency is the original concurrency
String expected = R"(
Union: <for test>
Expression x 10: <final projection>
MockTableScan)";
ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10);
}
}
CATCH

TEST_F(ScanConcurrencyHintTest, ValidHint)
try
{
context.addMockTable("simple_test", "t1", {{"a", TiDB::TP::TypeString}, {"b", TiDB::TP::TypeString}}, {toNullableVec<String>("a", {"1", "2", {}, "1", {}}), toNullableVec<String>("b", {"3", "4", "3", {}, {}})}, 3);
auto request = context.scan("simple_test", "t1").build(context);
{
/// case 1, concurrency < scan concurrency hint, the final stream concurrency is the original concurrency
String expected = R"(
Expression: <final projection>
MockTableScan)";
ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 1);
}
{
/// case 2, concurrency = scan concurrency hint, the final stream concurrency is the original concurrency
String expected = R"(
Union: <for test>
Expression x 3: <final projection>
MockTableScan)";
ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 3);
}
{
/// case 3, concurrency > scan concurrency hint, the final stream concurrency is the scan concurrency hint
String expected = R"(
Union: <for test>
Expression x 3: <final projection>
MockTableScan)";
ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10);
}
}
CATCH

} // namespace tests
} // namespace DB
1 change: 1 addition & 0 deletions dbms/src/TestUtils/ExecutorTestUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ void ExecutorTest::executeInterpreter(const String & expected_string, const std:
DAGContext dag_context(*request, "interpreter_test", concurrency);
context.context.setDAGContext(&dag_context);
context.context.setInterpreterTest();
context.context.setMockStorage(context.mockStorage());

// Don't care regions information in interpreter tests.
auto query_executor = queryExecute(context.context, /*internal=*/true);
Expand Down
37 changes: 30 additions & 7 deletions dbms/src/TestUtils/mockExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -360,16 +360,37 @@ DAGRequestBuilder & DAGRequestBuilder::sort(MockOrderByItemVec order_by_vec, boo
return *this;
}

void MockDAGRequestContext::addMockTable(const String & db, const String & table, const MockColumnInfoVec & columnInfos)
void MockDAGRequestContext::addMockTable(const String & db, const String & table, const MockColumnInfoVec & mock_column_infos)
{
mock_storage->addTableSchema(db + "." + table, columnInfos);
auto columns = getColumnWithTypeAndName(genNamesAndTypes(mockColumnInfosToTiDBColumnInfos(mock_column_infos), "mock_table_scan"));
addMockTable(db, table, mock_column_infos, columns);
}

void MockDAGRequestContext::addMockTable(const MockTableName & name, const MockColumnInfoVec & columnInfos)
void MockDAGRequestContext::addMockTableSchema(const String & db, const String & table, const MockColumnInfoVec & columnInfos)
{
mock_storage->addTableSchema(db + "." + table, columnInfos);
}
void MockDAGRequestContext::addMockTableSchema(const MockTableName & name, const MockColumnInfoVec & columnInfos)
{
mock_storage->addTableSchema(name.first + "." + name.second, columnInfos);
}

void MockDAGRequestContext::addMockTable(const MockTableName & name, const MockColumnInfoVec & mock_column_infos)
{
auto columns = getColumnWithTypeAndName(genNamesAndTypes(mockColumnInfosToTiDBColumnInfos(mock_column_infos), "mock_table_scan"));
addMockTable(name, mock_column_infos, columns);
}

void MockDAGRequestContext::addMockTableConcurrencyHint(const String & db, const String & table, size_t concurrency_hint)
{
mock_storage->addTableScanConcurrencyHint(db + "." + table, concurrency_hint);
}

void MockDAGRequestContext::addMockTableConcurrencyHint(const MockTableName & name, size_t concurrency_hint)
{
mock_storage->addTableScanConcurrencyHint(name.first + "." + name.second, concurrency_hint);
}

void MockDAGRequestContext::addExchangeRelationSchema(String name, const MockColumnInfoVec & columnInfos)
{
mock_storage->addExchangeSchema(name, columnInfos);
Expand Down Expand Up @@ -398,20 +419,22 @@ void MockDAGRequestContext::addExchangeReceiverColumnData(const String & name, C
mock_storage->addExchangeData(name, columns);
}

void MockDAGRequestContext::addMockTable(const String & db, const String & table, const MockColumnInfoVec & columnInfos, ColumnsWithTypeAndName columns)
void MockDAGRequestContext::addMockTable(const String & db, const String & table, const MockColumnInfoVec & columnInfos, ColumnsWithTypeAndName columns, size_t concurrency_hint)
{
assertMockInput(columnInfos, columns);

addMockTable(db, table, columnInfos);
addMockTableSchema(db, table, columnInfos);
addMockTableColumnData(db, table, columns);
addMockTableConcurrencyHint(db, table, concurrency_hint);
}

void MockDAGRequestContext::addMockTable(const MockTableName & name, const MockColumnInfoVec & columnInfos, ColumnsWithTypeAndName columns)
void MockDAGRequestContext::addMockTable(const MockTableName & name, const MockColumnInfoVec & columnInfos, ColumnsWithTypeAndName columns, size_t concurrency_hint)
{
assertMockInput(columnInfos, columns);

addMockTable(name, columnInfos);
addMockTableSchema(name, columnInfos);
addMockTableColumnData(name, columns);
addMockTableConcurrencyHint(name, concurrency_hint);
}

void MockDAGRequestContext::addMockDeltaMergeSchema(const String & db, const String & table, const MockColumnInfoVec & columnInfos)
Expand Down
Loading

0 comments on commit 20c5a07

Please sign in to comment.