Skip to content

Commit

Permalink
refine
Browse files Browse the repository at this point in the history
Signed-off-by: xufei <xufei@pingcap.com>
  • Loading branch information
windtalker committed Jan 6, 2023
1 parent b40ccfc commit 7b3d1d8
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 6 deletions.
4 changes: 1 addition & 3 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,7 @@ void DAGQueryBlockInterpreter::handleMockTableScan(const TiDBTableScan & table_s
else
{
/// build from user input blocks.
size_t scan_concurrency = max_streams;
size_t concurrency_hint = context.mockStorage()->getScanConcurrencyHint(table_scan.getLogicalTableID());
scan_concurrency = concurrency_hint == 0 ? scan_concurrency : std::min(scan_concurrency, concurrency_hint);
size_t scan_concurrency = getMockSourceStreamConcurrency(max_streams, context.mockStorage()->getScanConcurrencyHint(table_scan.getLogicalTableID()));
assert(context.mockStorage()->tableExists(table_scan.getLogicalTableID()));
NamesAndTypes names_and_types;
std::vector<std::shared_ptr<DB::MockTableScanBlockInputStream>> mock_table_scan_streams;
Expand Down
6 changes: 6 additions & 0 deletions dbms/src/Flash/Coprocessor/MockSourceStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,10 @@ std::pair<NamesAndTypes, std::vector<std::shared_ptr<MockTableScanBlockInputStre
ColumnsWithTypeAndName columns_with_type_and_name = context.mockStorage()->getColumnsForMPPTableScan(table_scan, context.mockMPPServerInfo().partition_id, context.mockMPPServerInfo().partition_num);
return cutStreams<MockTableScanBlockInputStream>(context, columns_with_type_and_name, max_streams, log);
}
size_t getMockSourceStreamConcurrency(size_t max_streams, size_t scan_concurrency_hint)
{
if (scan_concurrency_hint == 0)
return max_streams;
return std::max(std::min(max_streams, scan_concurrency_hint), 1);
}
} // namespace DB
2 changes: 2 additions & 0 deletions dbms/src/Flash/Coprocessor/MockSourceStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ std::pair<NamesAndTypes, std::vector<std::shared_ptr<SourceType>>> cutStreams(Co

std::pair<NamesAndTypes, std::vector<std::shared_ptr<MockTableScanBlockInputStream>>> mockSourceStreamForMpp(Context & context, size_t max_streams, DB::LoggerPtr log, const TiDBTableScan & table_scan);

size_t getMockSourceStreamConcurrency(size_t max_streams, size_t scan_concurrency_hint);

template <typename SourceType>
std::pair<NamesAndTypes, std::vector<std::shared_ptr<SourceType>>> mockSourceStream(Context & context, size_t max_streams, DB::LoggerPtr log, String executor_id, Int64 table_id = 0)
{
Expand Down
4 changes: 1 addition & 3 deletions dbms/src/Flash/Planner/plans/PhysicalMockTableScan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,7 @@ std::pair<NamesAndTypes, BlockInputStreams> mockSchemaAndStreams(
NamesAndTypes schema;
BlockInputStreams mock_streams;
auto & dag_context = *context.getDAGContext();
size_t max_streams = dag_context.initialize_concurrency;
size_t concurrency_hint = context.mockStorage()->getScanConcurrencyHint(table_scan.getLogicalTableID());
max_streams = concurrency_hint == 0 ? max_streams : std::min(max_streams, concurrency_hint);
size_t max_streams = getMockSourceStreamConcurrency(dag_context.initialize_concurrency, context.mockStorage()->getScanConcurrencyHint(table_scan.getLogicalTableID()));
assert(max_streams > 0);

if (context.mockStorage()->useDeltaMerge())
Expand Down

0 comments on commit 7b3d1d8

Please sign in to comment.