diff --git a/dbms/src/Debug/MockStorage.cpp b/dbms/src/Debug/MockStorage.cpp index f47e6bdc921..50ac204199e 100644 --- a/dbms/src/Debug/MockStorage.cpp +++ b/dbms/src/Debug/MockStorage.cpp @@ -44,6 +44,11 @@ void MockStorage::addTableData(const String & name, ColumnsWithTypeAndName & col table_columns[getTableId(name)] = columns; } +void MockStorage::addTableScanConcurrencyHint(const String & name, size_t concurrency_hint) +{ + table_scan_concurrency_hint[getTableId(name)] = concurrency_hint; +} + Int64 MockStorage::getTableId(const String & name) { if (name_to_id_map.find(name) != name_to_id_map.end()) @@ -67,6 +72,15 @@ ColumnsWithTypeAndName MockStorage::getColumns(Int64 table_id) throw Exception(fmt::format("Failed to get columns by table_id '{}'", table_id)); } +size_t MockStorage::getScanConcurrencyHint(Int64 table_id) +{ + if (tableExists(table_id)) + { + return table_scan_concurrency_hint[table_id]; + } + return 0; +} + MockColumnInfoVec MockStorage::getTableSchema(const String & name) { if (tableExists(getTableId(name))) @@ -366,17 +380,8 @@ void MockStorage::addTableInfo(const String & name, const MockColumnInfoVec & co TableInfo table_info; table_info.name = name; table_info.id = getTableId(name); - int i = 0; - for (const auto & column : columns) - { - TiDB::ColumnInfo ret; - std::tie(ret.name, ret.tp) = column; - // TODO: find a way to assign decimal field's flen. - if (ret.tp == TiDB::TP::TypeNewDecimal) - ret.flen = 65; - ret.id = i++; - table_info.columns.push_back(std::move(ret)); - } + auto column_infos = mockColumnInfosToTiDBColumnInfos(columns); + table_info.columns.swap(column_infos); table_infos[name] = table_info; } @@ -389,4 +394,23 @@ TableInfo MockStorage::getTableInfoForDeltaMerge(const String & name) { return table_infos_for_delta_merge[name]; } + +ColumnInfos mockColumnInfosToTiDBColumnInfos(const MockColumnInfoVec & mock_column_infos) +{ + ColumnID col_id = 0; + ColumnInfos ret; + ret.reserve(mock_column_infos.size()); + for (const auto & mock_column_info : mock_column_infos) + { + TiDB::ColumnInfo column_info; + std::tie(column_info.name, column_info.tp) = mock_column_info; + column_info.id = col_id++; + // TODO: find a way to assign decimal field's flen. + if (column_info.tp == TiDB::TP::TypeNewDecimal) + column_info.flen = 65; + ret.push_back(std::move(column_info)); + } + return ret; +} + } // namespace DB diff --git a/dbms/src/Debug/MockStorage.h b/dbms/src/Debug/MockStorage.h index abe45968aad..42b5c342473 100644 --- a/dbms/src/Debug/MockStorage.h +++ b/dbms/src/Debug/MockStorage.h @@ -45,6 +45,8 @@ class MockTableIdGenerator : public ext::Singleton std::atomic current_id = 0; }; +ColumnInfos mockColumnInfosToTiDBColumnInfos(const MockColumnInfoVec & mock_column_infos); + /** Responsible for mock data for executor tests and mpp tests. * 1. Use this class to add mock table schema and table column data. * 2. Use this class to add mock exchange schema and exchange column data. @@ -58,10 +60,14 @@ class MockStorage void addTableData(const String & name, ColumnsWithTypeAndName & columns); + void addTableScanConcurrencyHint(const String & name, size_t concurrency_hint); + MockColumnInfoVec getTableSchema(const String & name); ColumnsWithTypeAndName getColumns(Int64 table_id); + size_t getScanConcurrencyHint(Int64 table_id); + bool tableExists(Int64 table_id); /// for storage delta merge table scan @@ -98,6 +104,8 @@ class MockStorage TableInfo getTableInfo(const String & name); TableInfo getTableInfoForDeltaMerge(const String & name); + size_t getTableScanConcurrencyHint(const TiDBTableScan & table_scan); + /// clear for StorageDeltaMerge void clear(); @@ -111,6 +119,7 @@ class MockStorage std::unordered_map table_schema; /// std::unordered_map table_columns; /// std::unordered_map table_infos; /// + std::unordered_map table_scan_concurrency_hint; /// /// for mock exchange receiver std::unordered_map executor_id_to_name_map; /// diff --git a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp index 2dadb2cd54c..18afe1b08e6 100644 --- a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp @@ -164,19 +164,7 @@ AnalysisResult analyzeExpressions( // for tests, we need to mock tableScan blockInputStream as the source stream. void DAGQueryBlockInterpreter::handleMockTableScan(const TiDBTableScan & table_scan, DAGPipeline & pipeline) { - // Interpreter test will not use columns in MockStorage - if (context.isInterpreterTest()) - { - auto names_and_types = genNamesAndTypes(table_scan, "mock_table_scan"); - auto columns_with_type_and_name = getColumnWithTypeAndName(names_and_types); - analyzer = std::make_unique(std::move(names_and_types), context); - for (size_t i = 0; i < max_streams; ++i) - { - auto mock_table_scan_stream = std::make_shared(columns_with_type_and_name, context.getSettingsRef().max_block_size); - pipeline.streams.emplace_back(mock_table_scan_stream); - } - } - else if (context.mockStorage()->useDeltaMerge()) + if (context.mockStorage()->useDeltaMerge()) { assert(context.mockStorage()->tableExistsForDeltaMerge(table_scan.getLogicalTableID())); auto names_and_types = context.mockStorage()->getNameAndTypesForDeltaMerge(table_scan.getLogicalTableID()); @@ -187,16 +175,17 @@ void DAGQueryBlockInterpreter::handleMockTableScan(const TiDBTableScan & table_s else { /// build from user input blocks. + size_t scan_concurrency = getMockSourceStreamConcurrency(max_streams, context.mockStorage()->getScanConcurrencyHint(table_scan.getLogicalTableID())); assert(context.mockStorage()->tableExists(table_scan.getLogicalTableID())); NamesAndTypes names_and_types; std::vector> mock_table_scan_streams; if (context.isMPPTest()) { - std::tie(names_and_types, mock_table_scan_streams) = mockSourceStreamForMpp(context, max_streams, log, table_scan); + std::tie(names_and_types, mock_table_scan_streams) = mockSourceStreamForMpp(context, scan_concurrency, log, table_scan); } else { - std::tie(names_and_types, mock_table_scan_streams) = mockSourceStream(context, max_streams, log, table_scan.getTableScanExecutorID(), table_scan.getLogicalTableID()); + std::tie(names_and_types, mock_table_scan_streams) = mockSourceStream(context, scan_concurrency, log, table_scan.getTableScanExecutorID(), table_scan.getLogicalTableID()); } analyzer = std::make_unique(std::move(names_and_types), context); diff --git a/dbms/src/Flash/Coprocessor/GenSchemaAndColumn.cpp b/dbms/src/Flash/Coprocessor/GenSchemaAndColumn.cpp index 7ab505233d8..5fe3834b65f 100644 --- a/dbms/src/Flash/Coprocessor/GenSchemaAndColumn.cpp +++ b/dbms/src/Flash/Coprocessor/GenSchemaAndColumn.cpp @@ -56,13 +56,13 @@ String genNameForExchangeReceiver(Int32 col_index) return "exchange_receiver_" + std::to_string(col_index); } -NamesAndTypes genNamesAndTypes(const TiDBTableScan & table_scan, const StringRef & column_prefix) +NamesAndTypes genNamesAndTypes(const ColumnInfos & column_infos, const StringRef & column_prefix) { NamesAndTypes names_and_types; - names_and_types.reserve(table_scan.getColumnSize()); - for (Int32 i = 0; i < table_scan.getColumnSize(); ++i) + names_and_types.reserve(column_infos.size()); + for (size_t i = 0; i < column_infos.size(); ++i) { - const auto column_info = table_scan.getColumns()[i]; + const auto & column_info = column_infos[i]; switch (column_info.id) { case TiDBPkColumnID: @@ -72,11 +72,15 @@ NamesAndTypes genNamesAndTypes(const TiDBTableScan & table_scan, const StringRef names_and_types.emplace_back(MutableSupport::extra_table_id_column_name, MutableSupport::extra_table_id_column_type); break; default: - names_and_types.emplace_back(fmt::format("{}_{}", column_prefix, i), getDataTypeByColumnInfoForComputingLayer(column_info)); + names_and_types.emplace_back(column_info.name.empty() ? fmt::format("{}_{}", column_prefix, i) : column_info.name, getDataTypeByColumnInfoForComputingLayer(column_info)); } } return names_and_types; } +NamesAndTypes genNamesAndTypes(const TiDBTableScan & table_scan, const StringRef & column_prefix) +{ + return genNamesAndTypes(table_scan.getColumns(), column_prefix); +} ColumnsWithTypeAndName getColumnWithTypeAndName(const NamesAndTypes & names_and_types) { diff --git a/dbms/src/Flash/Coprocessor/GenSchemaAndColumn.h b/dbms/src/Flash/Coprocessor/GenSchemaAndColumn.h index 7c6d08113a4..245c7a086c4 100644 --- a/dbms/src/Flash/Coprocessor/GenSchemaAndColumn.h +++ b/dbms/src/Flash/Coprocessor/GenSchemaAndColumn.h @@ -28,6 +28,7 @@ NamesAndTypes genNamesAndTypesForTableScan(const TiDBTableScan & table_scan); String genNameForExchangeReceiver(Int32 col_index); NamesAndTypes genNamesAndTypes(const TiDBTableScan & table_scan, const StringRef & column_prefix); +NamesAndTypes genNamesAndTypes(const ColumnInfos & column_infos, const StringRef & column_prefix); ColumnsWithTypeAndName getColumnWithTypeAndName(const NamesAndTypes & names_and_types); NamesAndTypes toNamesAndTypes(const DAGSchema & dag_schema); } // namespace DB diff --git a/dbms/src/Flash/Coprocessor/MockSourceStream.cpp b/dbms/src/Flash/Coprocessor/MockSourceStream.cpp index df770b113aa..bd384cff50f 100644 --- a/dbms/src/Flash/Coprocessor/MockSourceStream.cpp +++ b/dbms/src/Flash/Coprocessor/MockSourceStream.cpp @@ -21,4 +21,10 @@ std::pairgetColumnsForMPPTableScan(table_scan, context.mockMPPServerInfo().partition_id, context.mockMPPServerInfo().partition_num); return cutStreams(context, columns_with_type_and_name, max_streams, log); } +size_t getMockSourceStreamConcurrency(size_t max_streams, size_t scan_concurrency_hint) +{ + if (scan_concurrency_hint == 0) + return max_streams; + return std::max(std::min(max_streams, scan_concurrency_hint), 1); +} } // namespace DB diff --git a/dbms/src/Flash/Coprocessor/MockSourceStream.h b/dbms/src/Flash/Coprocessor/MockSourceStream.h index ec771a731dc..88691eabaa6 100644 --- a/dbms/src/Flash/Coprocessor/MockSourceStream.h +++ b/dbms/src/Flash/Coprocessor/MockSourceStream.h @@ -66,6 +66,8 @@ std::pair>> cutStreams(Co std::pair>> mockSourceStreamForMpp(Context & context, size_t max_streams, DB::LoggerPtr log, const TiDBTableScan & table_scan); +size_t getMockSourceStreamConcurrency(size_t max_streams, size_t scan_concurrency_hint); + template std::pair>> mockSourceStream(Context & context, size_t max_streams, DB::LoggerPtr log, String executor_id, Int64 table_id = 0) { diff --git a/dbms/src/Flash/Planner/plans/PhysicalMockTableScan.cpp b/dbms/src/Flash/Planner/plans/PhysicalMockTableScan.cpp index 1d848918eb4..bd6eada3242 100644 --- a/dbms/src/Flash/Planner/plans/PhysicalMockTableScan.cpp +++ b/dbms/src/Flash/Planner/plans/PhysicalMockTableScan.cpp @@ -35,18 +35,10 @@ std::pair mockSchemaAndStreams( NamesAndTypes schema; BlockInputStreams mock_streams; auto & dag_context = *context.getDAGContext(); - size_t max_streams = dag_context.initialize_concurrency; + size_t max_streams = getMockSourceStreamConcurrency(dag_context.initialize_concurrency, context.mockStorage()->getScanConcurrencyHint(table_scan.getLogicalTableID())); assert(max_streams > 0); - // Interpreter test will not use columns in MockStorage - if (context.isInterpreterTest()) - { - schema = genNamesAndTypes(table_scan, "mock_table_scan"); - auto columns_with_type_and_name = getColumnWithTypeAndName(schema); - for (size_t i = 0; i < max_streams; ++i) - mock_streams.emplace_back(std::make_shared(columns_with_type_and_name, context.getSettingsRef().max_block_size)); - } - else if (context.mockStorage()->useDeltaMerge()) + if (context.mockStorage()->useDeltaMerge()) { assert(context.mockStorage()->tableExistsForDeltaMerge(table_scan.getLogicalTableID())); schema = context.mockStorage()->getNameAndTypesForDeltaMerge(table_scan.getLogicalTableID()); diff --git a/dbms/src/Flash/tests/gtest_scan_concurrency_hint.cpp b/dbms/src/Flash/tests/gtest_scan_concurrency_hint.cpp new file mode 100644 index 00000000000..7e00b49fe4e --- /dev/null +++ b/dbms/src/Flash/tests/gtest_scan_concurrency_hint.cpp @@ -0,0 +1,81 @@ +// Copyright 2023 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include +#include + +namespace DB +{ +namespace tests +{ +class ScanConcurrencyHintTest : public DB::tests::ExecutorTest +{ +public: + void initializeContext() override + { + ExecutorTest::initializeContext(); + } +}; + +TEST_F(ScanConcurrencyHintTest, InvalidHint) +try +{ + context.addMockTable("simple_test", "t1", {{"a", TiDB::TP::TypeString}, {"b", TiDB::TP::TypeString}}, {toNullableVec("a", {"1", "2", {}, "1", {}}), toNullableVec("b", {"3", "4", "3", {}, {}})}, 0); + auto request = context.scan("simple_test", "t1").build(context); + { + /// the scan concurrency hint is invalid, the final stream concurrency is the original concurrency + String expected = R"( +Union: + Expression x 10: + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); + } +} +CATCH + +TEST_F(ScanConcurrencyHintTest, ValidHint) +try +{ + context.addMockTable("simple_test", "t1", {{"a", TiDB::TP::TypeString}, {"b", TiDB::TP::TypeString}}, {toNullableVec("a", {"1", "2", {}, "1", {}}), toNullableVec("b", {"3", "4", "3", {}, {}})}, 3); + auto request = context.scan("simple_test", "t1").build(context); + { + /// case 1, concurrency < scan concurrency hint, the final stream concurrency is the original concurrency + String expected = R"( +Expression: + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 1); + } + { + /// case 2, concurrency = scan concurrency hint, the final stream concurrency is the original concurrency + String expected = R"( +Union: + Expression x 3: + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 3); + } + { + /// case 3, concurrency > scan concurrency hint, the final stream concurrency is the scan concurrency hint + String expected = R"( +Union: + Expression x 3: + MockTableScan)"; + ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); + } +} +CATCH + +} // namespace tests +} // namespace DB diff --git a/dbms/src/TestUtils/ExecutorTestUtils.cpp b/dbms/src/TestUtils/ExecutorTestUtils.cpp index 3949a50ab29..2153008d762 100644 --- a/dbms/src/TestUtils/ExecutorTestUtils.cpp +++ b/dbms/src/TestUtils/ExecutorTestUtils.cpp @@ -99,6 +99,7 @@ void ExecutorTest::executeInterpreter(const String & expected_string, const std: DAGContext dag_context(*request, "interpreter_test", concurrency); context.context.setDAGContext(&dag_context); context.context.setInterpreterTest(); + context.context.setMockStorage(context.mockStorage()); // Don't care regions information in interpreter tests. auto query_executor = queryExecute(context.context, /*internal=*/true); diff --git a/dbms/src/TestUtils/mockExecutor.cpp b/dbms/src/TestUtils/mockExecutor.cpp index e8012edc897..bf87c2fa2a0 100644 --- a/dbms/src/TestUtils/mockExecutor.cpp +++ b/dbms/src/TestUtils/mockExecutor.cpp @@ -360,16 +360,37 @@ DAGRequestBuilder & DAGRequestBuilder::sort(MockOrderByItemVec order_by_vec, boo return *this; } -void MockDAGRequestContext::addMockTable(const String & db, const String & table, const MockColumnInfoVec & columnInfos) +void MockDAGRequestContext::addMockTable(const String & db, const String & table, const MockColumnInfoVec & mock_column_infos) { - mock_storage->addTableSchema(db + "." + table, columnInfos); + auto columns = getColumnWithTypeAndName(genNamesAndTypes(mockColumnInfosToTiDBColumnInfos(mock_column_infos), "mock_table_scan")); + addMockTable(db, table, mock_column_infos, columns); } -void MockDAGRequestContext::addMockTable(const MockTableName & name, const MockColumnInfoVec & columnInfos) +void MockDAGRequestContext::addMockTableSchema(const String & db, const String & table, const MockColumnInfoVec & columnInfos) +{ + mock_storage->addTableSchema(db + "." + table, columnInfos); +} +void MockDAGRequestContext::addMockTableSchema(const MockTableName & name, const MockColumnInfoVec & columnInfos) { mock_storage->addTableSchema(name.first + "." + name.second, columnInfos); } +void MockDAGRequestContext::addMockTable(const MockTableName & name, const MockColumnInfoVec & mock_column_infos) +{ + auto columns = getColumnWithTypeAndName(genNamesAndTypes(mockColumnInfosToTiDBColumnInfos(mock_column_infos), "mock_table_scan")); + addMockTable(name, mock_column_infos, columns); +} + +void MockDAGRequestContext::addMockTableConcurrencyHint(const String & db, const String & table, size_t concurrency_hint) +{ + mock_storage->addTableScanConcurrencyHint(db + "." + table, concurrency_hint); +} + +void MockDAGRequestContext::addMockTableConcurrencyHint(const MockTableName & name, size_t concurrency_hint) +{ + mock_storage->addTableScanConcurrencyHint(name.first + "." + name.second, concurrency_hint); +} + void MockDAGRequestContext::addExchangeRelationSchema(String name, const MockColumnInfoVec & columnInfos) { mock_storage->addExchangeSchema(name, columnInfos); @@ -398,20 +419,22 @@ void MockDAGRequestContext::addExchangeReceiverColumnData(const String & name, C mock_storage->addExchangeData(name, columns); } -void MockDAGRequestContext::addMockTable(const String & db, const String & table, const MockColumnInfoVec & columnInfos, ColumnsWithTypeAndName columns) +void MockDAGRequestContext::addMockTable(const String & db, const String & table, const MockColumnInfoVec & columnInfos, ColumnsWithTypeAndName columns, size_t concurrency_hint) { assertMockInput(columnInfos, columns); - addMockTable(db, table, columnInfos); + addMockTableSchema(db, table, columnInfos); addMockTableColumnData(db, table, columns); + addMockTableConcurrencyHint(db, table, concurrency_hint); } -void MockDAGRequestContext::addMockTable(const MockTableName & name, const MockColumnInfoVec & columnInfos, ColumnsWithTypeAndName columns) +void MockDAGRequestContext::addMockTable(const MockTableName & name, const MockColumnInfoVec & columnInfos, ColumnsWithTypeAndName columns, size_t concurrency_hint) { assertMockInput(columnInfos, columns); - addMockTable(name, columnInfos); + addMockTableSchema(name, columnInfos); addMockTableColumnData(name, columns); + addMockTableConcurrencyHint(name, concurrency_hint); } void MockDAGRequestContext::addMockDeltaMergeSchema(const String & db, const String & table, const MockColumnInfoVec & columnInfos) diff --git a/dbms/src/TestUtils/mockExecutor.h b/dbms/src/TestUtils/mockExecutor.h index 7e14ea903f5..89e6ce26935 100644 --- a/dbms/src/TestUtils/mockExecutor.h +++ b/dbms/src/TestUtils/mockExecutor.h @@ -180,9 +180,13 @@ class MockDAGRequestContext void addMockTable(const MockTableName & name, const MockColumnInfoVec & columnInfos); void addMockTableColumnData(const String & db, const String & table, ColumnsWithTypeAndName columns); void addMockTableColumnData(const MockTableName & name, ColumnsWithTypeAndName columns); + void addMockTableSchema(const String & db, const String & table, const MockColumnInfoVec & columnInfos); + void addMockTableSchema(const MockTableName & name, const MockColumnInfoVec & columnInfos); + void addMockTableConcurrencyHint(const String & db, const String & table, size_t concurrency_hint); + void addMockTableConcurrencyHint(const MockTableName & name, size_t concurrency_hint); - void addMockTable(const String & db, const String & table, const MockColumnInfoVec & columnInfos, ColumnsWithTypeAndName columns); - void addMockTable(const MockTableName & name, const MockColumnInfoVec & columnInfos, ColumnsWithTypeAndName columns); + void addMockTable(const String & db, const String & table, const MockColumnInfoVec & columnInfos, ColumnsWithTypeAndName columns, size_t concurrency_hint = 0); + void addMockTable(const MockTableName & name, const MockColumnInfoVec & columnInfos, ColumnsWithTypeAndName columns, size_t concurrency_hint = 0); /// mock DeltaMerge table scan void addMockDeltaMerge(const MockTableName & name, const MockColumnInfoVec & columnInfos, ColumnsWithTypeAndName columns);