diff --git a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp index 561011e2d95..45a3c1e9471 100644 --- a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp @@ -28,7 +28,6 @@ #include #include #include -#include #include #include #include @@ -39,6 +38,7 @@ #include #include #include +#include #include #include #include @@ -52,7 +52,6 @@ namespace DB { namespace FailPoints { -extern const char pause_after_copr_streams_acquired[]; extern const char minimum_block_size_for_cross_join[]; } // namespace FailPoints @@ -88,23 +87,6 @@ struct AnalysisResult bool is_final_agg; }; -// add timezone cast for timestamp type, this is used to support session level timezone -bool addExtraCastsAfterTs( - DAGExpressionAnalyzer & analyzer, - const std::vector & need_cast_column, - ExpressionActionsChain & chain, - const TiDBTableScan & table_scan) -{ - bool has_need_cast_column = false; - for (auto b : need_cast_column) - { - has_need_cast_column |= (b != ExtraCastAfterTSMode::None); - } - if (!has_need_cast_column) - return false; - return analyzer.appendExtraCastsAfterTS(chain, need_cast_column, table_scan); -} - AnalysisResult analyzeExpressions( Context & context, DAGExpressionAnalyzer & analyzer, @@ -169,223 +151,16 @@ AnalysisResult analyzeExpressions( //todo need call prependProjectInput?? return res; } - -void setQuotaAndLimitsOnTableScan(Context & context, DAGPipeline & pipeline) -{ - const Settings & settings = context.getSettingsRef(); - - IProfilingBlockInputStream::LocalLimits limits; - limits.mode = IProfilingBlockInputStream::LIMITS_TOTAL; - limits.size_limits = SizeLimits(settings.max_rows_to_read, settings.max_bytes_to_read, settings.read_overflow_mode); - limits.max_execution_time = settings.max_execution_time; - limits.timeout_overflow_mode = settings.timeout_overflow_mode; - - /** Quota and minimal speed restrictions are checked on the initiating server of the request, and not on remote servers, - * because the initiating server has a summary of the execution of the request on all servers. - * - * But limits on data size to read and maximum execution time are reasonable to check both on initiator and - * additionally on each remote server, because these limits are checked per block of data processed, - * and remote servers may process way more blocks of data than are received by initiator. - */ - limits.min_execution_speed = settings.min_execution_speed; - limits.timeout_before_checking_execution_speed = settings.timeout_before_checking_execution_speed; - - QuotaForIntervals & quota = context.getQuota(); - - pipeline.transform([&](auto & stream) { - if (IProfilingBlockInputStream * p_stream = dynamic_cast(stream.get())) - { - p_stream->setLimits(limits); - p_stream->setQuota(quota); - } - }); -} - } // namespace -ExpressionActionsPtr generateProjectExpressionActions( - const BlockInputStreamPtr & stream, - const Context & context, - const NamesWithAliases & project_cols) -{ - auto columns = stream->getHeader(); - NamesAndTypesList input_column; - for (const auto & column : columns.getColumnsWithTypeAndName()) - { - input_column.emplace_back(column.name, column.type); - } - ExpressionActionsPtr project = std::make_shared(input_column, context.getSettingsRef()); - project->add(ExpressionAction::project(project_cols)); - return project; -} - void DAGQueryBlockInterpreter::handleTableScan(const TiDBTableScan & table_scan, DAGPipeline & pipeline) { - bool has_region_to_read = false; - for (const auto physical_table_id : table_scan.getPhysicalTableIDs()) - { - const auto & table_regions_info = dagContext().getTableRegionsInfoByTableID(physical_table_id); - if (!table_regions_info.local_regions.empty() || !table_regions_info.remote_regions.empty()) - { - has_region_to_read = true; - break; - } - } - if (!has_region_to_read) - throw TiFlashException(fmt::format("Dag Request does not have region to read for table: {}", table_scan.getLogicalTableID()), Errors::Coprocessor::BadRequest); - // construct pushed down filter conditions. - std::vector conditions; - if (query_block.selection) - { - for (const auto & condition : query_block.selection->selection().conditions()) - conditions.push_back(&condition); - } + const auto push_down_filter = PushDownFilter::toPushDownFilter(query_block.selection); - DAGStorageInterpreter storage_interpreter(context, query_block, table_scan, conditions, max_streams); + DAGStorageInterpreter storage_interpreter(context, table_scan, push_down_filter, max_streams); storage_interpreter.execute(pipeline); analyzer = std::move(storage_interpreter.analyzer); - - - auto remote_requests = std::move(storage_interpreter.remote_requests); - auto null_stream_if_empty = std::move(storage_interpreter.null_stream_if_empty); - - // It is impossible to have no joined stream. - assert(pipeline.streams_with_non_joined_data.empty()); - // after executeRemoteQueryImpl, remote read stream will be appended in pipeline.streams. - size_t remote_read_streams_start_index = pipeline.streams.size(); - - // For those regions which are not presented in this tiflash node, we will try to fetch streams by key ranges from other tiflash nodes, only happens in batch cop / mpp mode. - if (!remote_requests.empty()) - executeRemoteQueryImpl(pipeline, remote_requests); - - /// record local and remote io input stream - auto & table_scan_io_input_streams = dagContext().getInBoundIOInputStreamsMap()[query_block.source_name]; - pipeline.transform([&](auto & stream) { table_scan_io_input_streams.push_back(stream); }); - - if (pipeline.streams.empty()) - { - pipeline.streams.emplace_back(null_stream_if_empty); - // reset remote_read_streams_start_index for null_stream_if_empty. - remote_read_streams_start_index = 1; - } - - /// Theoretically we could move addTableLock to DAGStorageInterpreter, but we don't wants to the table to be dropped - /// during the lifetime of this query, and sometimes if there is no local region, we will use the RemoteBlockInputStream - /// or even the null_stream to hold the lock, so I would like too keep the addTableLock in DAGQueryBlockInterpreter - pipeline.transform([&](auto & stream) { - // todo do not need to hold all locks in each stream, if the stream is reading from table a - // it only needs to hold the lock of table a - for (auto & lock : storage_interpreter.drop_locks) - stream->addTableLock(lock); - }); - - /// Set the limits and quota for reading data, the speed and time of the query. - setQuotaAndLimitsOnTableScan(context, pipeline); - FAIL_POINT_PAUSE(FailPoints::pause_after_copr_streams_acquired); - - /// handle timezone/duration cast for local and remote table scan. - executeCastAfterTableScan( - table_scan, - storage_interpreter.is_need_add_cast_column, - remote_read_streams_start_index, - pipeline); - recordProfileStreams(pipeline, query_block.source_name); - - /// handle pushed down filter for local and remote table scan. - if (query_block.selection) - { - executePushedDownFilter(conditions, remote_read_streams_start_index, pipeline); - recordProfileStreams(pipeline, query_block.selection_name); - } -} - -void DAGQueryBlockInterpreter::executePushedDownFilter( - const std::vector & conditions, - size_t remote_read_streams_start_index, - DAGPipeline & pipeline) -{ - ExpressionActionsChain chain; - analyzer->initChain(chain, analyzer->getCurrentInputColumns()); - String filter_column_name = analyzer->appendWhere(chain, conditions); - ExpressionActionsPtr before_where = chain.getLastActions(); - chain.addStep(); - - // remove useless tmp column and keep the schema of local streams and remote streams the same. - NamesWithAliases project_cols; - for (const auto & col : analyzer->getCurrentInputColumns()) - { - chain.getLastStep().required_output.push_back(col.name); - project_cols.emplace_back(col.name, col.name); - } - chain.getLastActions()->add(ExpressionAction::project(project_cols)); - ExpressionActionsPtr project_after_where = chain.getLastActions(); - chain.finalize(); - chain.clear(); - - assert(pipeline.streams_with_non_joined_data.empty()); - assert(remote_read_streams_start_index <= pipeline.streams.size()); - // for remote read, filter had been pushed down, don't need to execute again. - for (size_t i = 0; i < remote_read_streams_start_index; ++i) - { - auto & stream = pipeline.streams[i]; - stream = std::make_shared(stream, before_where, filter_column_name, log->identifier()); - // after filter, do project action to keep the schema of local streams and remote streams the same. - stream = std::make_shared(stream, project_after_where, log->identifier()); - } -} - -void DAGQueryBlockInterpreter::executeCastAfterTableScan( - const TiDBTableScan & table_scan, - const std::vector & is_need_add_cast_column, - size_t remote_read_streams_start_index, - DAGPipeline & pipeline) -{ - auto original_source_columns = analyzer->getCurrentInputColumns(); - - ExpressionActionsChain chain; - analyzer->initChain(chain, original_source_columns); - - // execute timezone cast or duration cast if needed for local table scan - if (addExtraCastsAfterTs(*analyzer, is_need_add_cast_column, chain, table_scan)) - { - ExpressionActionsPtr extra_cast = chain.getLastActions(); - chain.finalize(); - chain.clear(); - - // After `addExtraCastsAfterTs`, analyzer->getCurrentInputColumns() has been modified. - // For remote read, `timezone cast and duration cast` had been pushed down, don't need to execute cast expressions. - // To keep the schema of local read streams and remote read streams the same, do project action for remote read streams. - NamesWithAliases project_for_remote_read; - const auto & after_cast_source_columns = analyzer->getCurrentInputColumns(); - for (size_t i = 0; i < after_cast_source_columns.size(); ++i) - { - project_for_remote_read.emplace_back(original_source_columns[i].name, after_cast_source_columns[i].name); - } - assert(!project_for_remote_read.empty()); - assert(pipeline.streams_with_non_joined_data.empty()); - assert(remote_read_streams_start_index <= pipeline.streams.size()); - size_t i = 0; - // local streams - while (i < remote_read_streams_start_index) - { - auto & stream = pipeline.streams[i++]; - stream = std::make_shared(stream, extra_cast, log->identifier()); - } - // remote streams - if (i < pipeline.streams.size()) - { - ExpressionActionsPtr project_for_cop_read = generateProjectExpressionActions( - pipeline.streams[i], - context, - project_for_remote_read); - while (i < pipeline.streams.size()) - { - auto & stream = pipeline.streams[i++]; - stream = std::make_shared(stream, project_for_cop_read, log->identifier()); - } - } - } } void DAGQueryBlockInterpreter::prepareJoin( @@ -867,71 +642,6 @@ void DAGQueryBlockInterpreter::recordProfileStreams(DAGPipeline & pipeline, cons pipeline.transform([&profile_streams](auto & stream) { profile_streams.push_back(stream); }); } -bool schemaMatch(const DAGSchema & left, const DAGSchema & right) -{ - if (left.size() != right.size()) - return false; - for (size_t i = 0; i < left.size(); i++) - { - const auto & left_ci = left[i]; - const auto & right_ci = right[i]; - if (left_ci.second.tp != right_ci.second.tp) - return false; - if (left_ci.second.flag != right_ci.second.flag) - return false; - } - return true; -} - -void DAGQueryBlockInterpreter::executeRemoteQueryImpl( - DAGPipeline & pipeline, - std::vector & remote_requests) -{ - assert(!remote_requests.empty()); - DAGSchema & schema = remote_requests[0].schema; -#ifndef NDEBUG - for (size_t i = 1; i < remote_requests.size(); i++) - { - if (!schemaMatch(schema, remote_requests[i].schema)) - throw Exception("Schema mismatch between different partitions for partition table"); - } -#endif - bool has_enforce_encode_type = remote_requests[0].dag_request.has_force_encode_type() && remote_requests[0].dag_request.force_encode_type(); - pingcap::kv::Cluster * cluster = context.getTMTContext().getKVCluster(); - std::vector all_tasks; - for (const auto & remote_request : remote_requests) - { - pingcap::coprocessor::RequestPtr req = std::make_shared(); - remote_request.dag_request.SerializeToString(&(req->data)); - req->tp = pingcap::coprocessor::ReqType::DAG; - req->start_ts = context.getSettingsRef().read_tso; - req->schema_version = context.getSettingsRef().schema_version; - - pingcap::kv::Backoffer bo(pingcap::kv::copBuildTaskMaxBackoff); - pingcap::kv::StoreType store_type = pingcap::kv::StoreType::TiFlash; - auto tasks = pingcap::coprocessor::buildCopTasks(bo, cluster, remote_request.key_ranges, req, store_type, &Poco::Logger::get("pingcap/coprocessor")); - all_tasks.insert(all_tasks.end(), tasks.begin(), tasks.end()); - } - - size_t concurrent_num = std::min(context.getSettingsRef().max_threads, all_tasks.size()); - size_t task_per_thread = all_tasks.size() / concurrent_num; - size_t rest_task = all_tasks.size() % concurrent_num; - for (size_t i = 0, task_start = 0; i < concurrent_num; i++) - { - size_t task_end = task_start + task_per_thread; - if (i < rest_task) - task_end++; - if (task_end == task_start) - continue; - std::vector tasks(all_tasks.begin() + task_start, all_tasks.begin() + task_end); - - auto coprocessor_reader = std::make_shared(schema, cluster, tasks, has_enforce_encode_type, 1); - BlockInputStreamPtr input = std::make_shared(coprocessor_reader, log->identifier(), query_block.source_name); - pipeline.streams.push_back(input); - task_start = task_end; - } -} - void DAGQueryBlockInterpreter::handleExchangeReceiver(DAGPipeline & pipeline) { auto it = dagContext().getMPPExchangeReceiverMap().find(query_block.source_name); @@ -1041,7 +751,7 @@ void DAGQueryBlockInterpreter::executeImpl(DAGPipeline & pipeline) } else if (query_block.isTableScanSource()) { - TiDBTableScan table_scan(query_block.source, dagContext()); + TiDBTableScan table_scan(query_block.source, query_block.source_name, dagContext()); handleTableScan(table_scan, pipeline); dagContext().table_scan_executor_id = query_block.source_name; } diff --git a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h index 8e6908dec80..84253afbc45 100644 --- a/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h +++ b/dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.h @@ -58,12 +58,6 @@ class DAGQueryBlockInterpreter #endif void executeImpl(DAGPipeline & pipeline); void handleTableScan(const TiDBTableScan & table_scan, DAGPipeline & pipeline); - void executeCastAfterTableScan( - const TiDBTableScan & table_scan, - const std::vector & is_need_add_cast_column, - size_t remote_read_streams_start_index, - DAGPipeline & pipeline); - void executePushedDownFilter(const std::vector & conditions, size_t remote_read_streams_start_index, DAGPipeline & pipeline); void handleJoin(const tipb::Join & join, DAGPipeline & pipeline, SubqueryForSet & right_query); void prepareJoin( const google::protobuf::RepeatedPtrField & keys, @@ -108,10 +102,6 @@ class DAGQueryBlockInterpreter void restorePipelineConcurrency(DAGPipeline & pipeline); - void executeRemoteQueryImpl( - DAGPipeline & pipeline, - std::vector & remote_requests); - DAGContext & dagContext() const { return *context.getDAGContext(); } Context & context; diff --git a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp index 0a3e6396ece..f514293e7d6 100644 --- a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp +++ b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp @@ -15,9 +15,15 @@ #include #include #include +#include +#include +#include #include +#include +#include #include #include +#include #include #include #include @@ -33,6 +39,7 @@ extern const char region_exception_after_read_from_storage_some_error[]; extern const char region_exception_after_read_from_storage_all_error[]; extern const char pause_after_learner_read[]; extern const char force_remote_read_for_batch_cop[]; +extern const char pause_after_copr_streams_acquired[]; } // namespace FailPoints namespace @@ -73,7 +80,7 @@ MakeRegionQueryInfos( if (r.key_ranges.empty()) { throw TiFlashException( - "Income key ranges is empty for region: " + std::to_string(r.region_id), + fmt::format("Income key ranges is empty for region: {}", r.region_id), Errors::Coprocessor::BadRequest); } if (region_force_retry.count(id)) @@ -103,14 +110,16 @@ MakeRegionQueryInfos( if (!computeMappedTableID(*p.first, table_id_in_range) || table_id_in_range != physical_table_id) { throw TiFlashException( - "Income key ranges is illegal for region: " + std::to_string(r.region_id) - + ", table id in key range is " + std::to_string(table_id_in_range) + ", table id in region is " - + std::to_string(physical_table_id), + fmt::format( + "Income key ranges is illegal for region: {}, table id in key range is {}, table id in region is {}", + r.region_id, + table_id_in_range, + physical_table_id), Errors::Coprocessor::BadRequest); } if (p.first->compare(*info.range_in_table.first) < 0 || p.second->compare(*info.range_in_table.second) > 0) throw TiFlashException( - "Income key ranges is illegal for region: " + std::to_string(r.region_id), + fmt::format("Income key ranges is illegal for region: {}", r.region_id), Errors::Coprocessor::BadRequest); } info.required_handle_ranges = r.key_ranges; @@ -127,18 +136,78 @@ MakeRegionQueryInfos( return std::make_tuple(std::move(region_need_retry), status_res); } +bool hasRegionToRead(const DAGContext & dag_context, const TiDBTableScan & table_scan) +{ + bool has_region_to_read = false; + for (const auto physical_table_id : table_scan.getPhysicalTableIDs()) + { + const auto & table_regions_info = dag_context.getTableRegionsInfoByTableID(physical_table_id); + if (!table_regions_info.local_regions.empty() || !table_regions_info.remote_regions.empty()) + { + has_region_to_read = true; + break; + } + } + return has_region_to_read; +} + +void setQuotaAndLimitsOnTableScan(Context & context, DAGPipeline & pipeline) +{ + const Settings & settings = context.getSettingsRef(); + + IProfilingBlockInputStream::LocalLimits limits; + limits.mode = IProfilingBlockInputStream::LIMITS_TOTAL; + limits.size_limits = SizeLimits(settings.max_rows_to_read, settings.max_bytes_to_read, settings.read_overflow_mode); + limits.max_execution_time = settings.max_execution_time; + limits.timeout_overflow_mode = settings.timeout_overflow_mode; + + /** Quota and minimal speed restrictions are checked on the initiating server of the request, and not on remote servers, + * because the initiating server has a summary of the execution of the request on all servers. + * + * But limits on data size to read and maximum execution time are reasonable to check both on initiator and + * additionally on each remote server, because these limits are checked per block of data processed, + * and remote servers may process way more blocks of data than are received by initiator. + */ + limits.min_execution_speed = settings.min_execution_speed; + limits.timeout_before_checking_execution_speed = settings.timeout_before_checking_execution_speed; + + QuotaForIntervals & quota = context.getQuota(); + + pipeline.transform([&](auto & stream) { + if (IProfilingBlockInputStream * p_stream = dynamic_cast(stream.get())) + { + p_stream->setLimits(limits); + p_stream->setQuota(quota); + } + }); +} + +// add timezone cast for timestamp type, this is used to support session level timezone +bool addExtraCastsAfterTs( + DAGExpressionAnalyzer & analyzer, + const std::vector & need_cast_column, + ExpressionActionsChain & chain, + const TiDBTableScan & table_scan) +{ + bool has_need_cast_column = false; + for (auto b : need_cast_column) + { + has_need_cast_column |= (b != ExtraCastAfterTSMode::None); + } + if (!has_need_cast_column) + return false; + return analyzer.appendExtraCastsAfterTS(chain, need_cast_column, table_scan); +} } // namespace DAGStorageInterpreter::DAGStorageInterpreter( Context & context_, - const DAGQueryBlock & query_block_, const TiDBTableScan & table_scan_, - const std::vector & conditions_, + const PushDownFilter & push_down_filter_, size_t max_streams_) : context(context_) - , query_block(query_block_) , table_scan(table_scan_) - , conditions(conditions_) + , push_down_filter(push_down_filter_) , max_streams(max_streams_) , log(Logger::get("DAGStorageInterpreter", context.getDAGContext()->log ? context.getDAGContext()->log->identifier() : "")) , logical_table_id(table_scan.getLogicalTableID()) @@ -146,9 +215,80 @@ DAGStorageInterpreter::DAGStorageInterpreter( , tmt(context.getTMTContext()) , mvcc_query_info(new MvccQueryInfo(true, settings.read_tso)) { + if (unlikely(!hasRegionToRead(dagContext(), table_scan))) + { + throw TiFlashException( + fmt::format("Dag Request does not have region to read for table: {}", logical_table_id), + Errors::Coprocessor::BadRequest); + } } void DAGStorageInterpreter::execute(DAGPipeline & pipeline) +{ + prepare(); + + executeImpl(pipeline); +} + +void DAGStorageInterpreter::executeImpl(DAGPipeline & pipeline) +{ + if (!mvcc_query_info->regions_query_info.empty()) + doLocalRead(pipeline, settings.max_block_size); + + null_stream_if_empty = std::make_shared(storage_for_logical_table->getSampleBlockForColumns(required_columns)); + + // Should build these vars under protect of `table_structure_lock`. + buildRemoteRequests(); + + releaseAlterLocks(); + + // It is impossible to have no joined stream. + assert(pipeline.streams_with_non_joined_data.empty()); + // after executeRemoteQuery, remote read stream will be appended in pipeline.streams. + size_t remote_read_streams_start_index = pipeline.streams.size(); + + // For those regions which are not presented in this tiflash node, we will try to fetch streams by key ranges from other tiflash nodes, only happens in batch cop / mpp mode. + if (!remote_requests.empty()) + executeRemoteQuery(pipeline); + + /// record local and remote io input stream + auto & table_scan_io_input_streams = dagContext().getInBoundIOInputStreamsMap()[table_scan.getTableScanExecutorID()]; + pipeline.transform([&](auto & stream) { table_scan_io_input_streams.push_back(stream); }); + + if (pipeline.streams.empty()) + { + pipeline.streams.emplace_back(null_stream_if_empty); + // reset remote_read_streams_start_index for null_stream_if_empty. + remote_read_streams_start_index = 1; + } + + /// We don't want the table to be dropped during the lifetime of this query, + /// and sometimes if there is no local region, we will use the RemoteBlockInputStream + /// or even the null_stream to hold the lock. + pipeline.transform([&](auto & stream) { + // todo do not need to hold all locks in each stream, if the stream is reading from table a + // it only needs to hold the lock of table a + for (auto & lock : drop_locks) + stream->addTableLock(lock); + }); + + /// Set the limits and quota for reading data, the speed and time of the query. + setQuotaAndLimitsOnTableScan(context, pipeline); + FAIL_POINT_PAUSE(FailPoints::pause_after_copr_streams_acquired); + + /// handle timezone/duration cast for local and remote table scan. + executeCastAfterTableScan(remote_read_streams_start_index, pipeline); + recordProfileStreams(pipeline, table_scan.getTableScanExecutorID()); + + /// handle pushed down filter for local and remote table scan. + if (push_down_filter.hasValue()) + { + executePushedDownFilter(remote_read_streams_start_index, pipeline); + recordProfileStreams(pipeline, push_down_filter.executor_id); + } +} + +void DAGStorageInterpreter::prepare() { const DAGContext & dag_context = *context.getDAGContext(); if (dag_context.isBatchCop() || dag_context.isMPPTask()) @@ -165,16 +305,159 @@ void DAGStorageInterpreter::execute(DAGPipeline & pipeline) analyzer = std::make_unique(std::move(source_columns), context); FAIL_POINT_PAUSE(FailPoints::pause_after_learner_read); +} - if (!mvcc_query_info->regions_query_info.empty()) - doLocalRead(pipeline, settings.max_block_size); +void DAGStorageInterpreter::executePushedDownFilter( + size_t remote_read_streams_start_index, + DAGPipeline & pipeline) +{ + ExpressionActionsChain chain; + analyzer->initChain(chain, analyzer->getCurrentInputColumns()); + String filter_column_name = analyzer->appendWhere(chain, push_down_filter.conditions); + ExpressionActionsPtr before_where = chain.getLastActions(); + chain.addStep(); + + // remove useless tmp column and keep the schema of local streams and remote streams the same. + NamesWithAliases project_cols; + for (const auto & col : analyzer->getCurrentInputColumns()) + { + chain.getLastStep().required_output.push_back(col.name); + project_cols.emplace_back(col.name, col.name); + } + chain.getLastActions()->add(ExpressionAction::project(project_cols)); + ExpressionActionsPtr project_after_where = chain.getLastActions(); + chain.finalize(); + chain.clear(); + + assert(pipeline.streams_with_non_joined_data.empty()); + assert(remote_read_streams_start_index <= pipeline.streams.size()); + // for remote read, filter had been pushed down, don't need to execute again. + for (size_t i = 0; i < remote_read_streams_start_index; ++i) + { + auto & stream = pipeline.streams[i]; + stream = std::make_shared(stream, before_where, filter_column_name, log->identifier()); + // after filter, do project action to keep the schema of local streams and remote streams the same. + stream = std::make_shared(stream, project_after_where, log->identifier()); + } +} - null_stream_if_empty = std::make_shared(storage_for_logical_table->getSampleBlockForColumns(required_columns)); +void DAGStorageInterpreter::executeCastAfterTableScan( + size_t remote_read_streams_start_index, + DAGPipeline & pipeline) +{ + auto original_source_columns = analyzer->getCurrentInputColumns(); - // Should build these vars under protect of `table_structure_lock`. - buildRemoteRequests(); + ExpressionActionsChain chain; + analyzer->initChain(chain, original_source_columns); - releaseAlterLocks(); + // execute timezone cast or duration cast if needed for local table scan + if (addExtraCastsAfterTs(*analyzer, is_need_add_cast_column, chain, table_scan)) + { + ExpressionActionsPtr extra_cast = chain.getLastActions(); + chain.finalize(); + chain.clear(); + + // After `addExtraCastsAfterTs`, analyzer->getCurrentInputColumns() has been modified. + // For remote read, `timezone cast and duration cast` had been pushed down, don't need to execute cast expressions. + // To keep the schema of local read streams and remote read streams the same, do project action for remote read streams. + NamesWithAliases project_for_remote_read; + const auto & after_cast_source_columns = analyzer->getCurrentInputColumns(); + for (size_t i = 0; i < after_cast_source_columns.size(); ++i) + { + project_for_remote_read.emplace_back(original_source_columns[i].name, after_cast_source_columns[i].name); + } + assert(!project_for_remote_read.empty()); + assert(pipeline.streams_with_non_joined_data.empty()); + assert(remote_read_streams_start_index <= pipeline.streams.size()); + size_t i = 0; + // local streams + while (i < remote_read_streams_start_index) + { + auto & stream = pipeline.streams[i++]; + stream = std::make_shared(stream, extra_cast, log->identifier()); + } + // remote streams + if (i < pipeline.streams.size()) + { + ExpressionActionsPtr project_for_cop_read = generateProjectExpressionActions( + pipeline.streams[i], + context, + project_for_remote_read); + while (i < pipeline.streams.size()) + { + auto & stream = pipeline.streams[i++]; + stream = std::make_shared(stream, project_for_cop_read, log->identifier()); + } + } + } +} + +void DAGStorageInterpreter::executeRemoteQuery(DAGPipeline & pipeline) +{ + assert(!remote_requests.empty()); + DAGSchema & schema = remote_requests[0].schema; +#ifndef NDEBUG + auto schema_match = [&schema](const DAGSchema & other) { + if (schema.size() != other.size()) + return false; + for (size_t i = 0; i < schema.size(); ++i) + { + if (schema[i].second.tp != other[i].second.tp || schema[i].second.flag != other[i].second.flag) + return false; + } + return true; + }; + for (size_t i = 1; i < remote_requests.size(); ++i) + { + if (!schema_match(remote_requests[i].schema)) + throw Exception("Schema mismatch between different partitions for partition table"); + } +#endif + bool has_enforce_encode_type = remote_requests[0].dag_request.has_force_encode_type() && remote_requests[0].dag_request.force_encode_type(); + pingcap::kv::Cluster * cluster = tmt.getKVCluster(); + std::vector all_tasks; + for (const auto & remote_request : remote_requests) + { + pingcap::coprocessor::RequestPtr req = std::make_shared(); + remote_request.dag_request.SerializeToString(&(req->data)); + req->tp = pingcap::coprocessor::ReqType::DAG; + req->start_ts = context.getSettingsRef().read_tso; + req->schema_version = context.getSettingsRef().schema_version; + + pingcap::kv::Backoffer bo(pingcap::kv::copBuildTaskMaxBackoff); + pingcap::kv::StoreType store_type = pingcap::kv::StoreType::TiFlash; + auto tasks = pingcap::coprocessor::buildCopTasks(bo, cluster, remote_request.key_ranges, req, store_type, &Poco::Logger::get("pingcap/coprocessor")); + all_tasks.insert(all_tasks.end(), tasks.begin(), tasks.end()); + } + + size_t concurrent_num = std::min(context.getSettingsRef().max_threads, all_tasks.size()); + size_t task_per_thread = all_tasks.size() / concurrent_num; + size_t rest_task = all_tasks.size() % concurrent_num; + for (size_t i = 0, task_start = 0; i < concurrent_num; ++i) + { + size_t task_end = task_start + task_per_thread; + if (i < rest_task) + task_end++; + if (task_end == task_start) + continue; + std::vector tasks(all_tasks.begin() + task_start, all_tasks.begin() + task_end); + + auto coprocessor_reader = std::make_shared(schema, cluster, tasks, has_enforce_encode_type, 1); + BlockInputStreamPtr input = std::make_shared(coprocessor_reader, log->identifier(), table_scan.getTableScanExecutorID()); + pipeline.streams.push_back(input); + task_start = task_end; + } +} + +DAGContext & DAGStorageInterpreter::dagContext() const +{ + return *context.getDAGContext(); +} + +void DAGStorageInterpreter::recordProfileStreams(DAGPipeline & pipeline, const String & key) +{ + auto & profile_streams = dagContext().getProfileStreamsMap()[key]; + pipeline.transform([&profile_streams](auto & stream) { profile_streams.push_back(stream); }); } LearnerReadSnapshot DAGStorageInterpreter::doCopLearnerRead() @@ -252,7 +535,7 @@ LearnerReadSnapshot DAGStorageInterpreter::doBatchCopLearnerRead() } catch (DB::Exception & e) { - e.addMessage("(while doing learner read for table, logical table_id: " + DB::toString(logical_table_id) + ")"); + e.addMessage(fmt::format("(while doing learner read for table, logical table_id: {})", logical_table_id)); throw; } } @@ -266,7 +549,7 @@ std::unordered_map DAGStorageInterpreter::generateSele /// to avoid null point exception query_info.query = makeDummyQuery(); query_info.dag_query = std::make_unique( - conditions, + push_down_filter.conditions, analyzer->getPreparedSets(), analyzer->getCurrentInputColumns(), context.getTimezoneInfo()); @@ -418,11 +701,18 @@ void DAGStorageInterpreter::doLocalRead(DAGPipeline & pipeline, size_t max_block { // Throw an exception for TiDB / TiSpark to retry if (table_id == logical_table_id) - e.addMessage("(while creating InputStreams from storage `" + storage->getDatabaseName() + "`.`" + storage->getTableName() - + "`, table_id: " + DB::toString(table_id) + ")"); + e.addMessage(fmt::format( + "(while creating InputStreams from storage `{}`.`{}`, table_id: {})", + storage->getDatabaseName(), + storage->getTableName(), + table_id)); else - e.addMessage("(while creating InputStreams from storage `" + storage->getDatabaseName() + "`.`" + storage->getTableName() - + "`, table_id: " + DB::toString(table_id) + ", logical_table_id: " + DB::toString(logical_table_id) + ")"); + e.addMessage(fmt::format( + "(while creating InputStreams from storage `{}`.`{}`, table_id: {}, logical_table_id: {})", + storage->getDatabaseName(), + storage->getTableName(), + table_id, + logical_table_id)); throw; } } @@ -430,11 +720,18 @@ void DAGStorageInterpreter::doLocalRead(DAGPipeline & pipeline, size_t max_block { /// Other unknown exceptions if (table_id == logical_table_id) - e.addMessage("(while creating InputStreams from storage `" + storage->getDatabaseName() + "`.`" + storage->getTableName() - + "`, table_id: " + DB::toString(table_id) + ")"); + e.addMessage(fmt::format( + "(while creating InputStreams from storage `{}`.`{}`, table_id: {})", + storage->getDatabaseName(), + storage->getTableName(), + table_id)); else - e.addMessage("(while creating InputStreams from storage `" + storage->getDatabaseName() + "`.`" + storage->getTableName() - + "`, table_id: " + DB::toString(table_id) + ", logical_table_id: " + DB::toString(logical_table_id) + ")"); + e.addMessage(fmt::format( + "(while creating InputStreams from storage `{}`.`{}`, table_id: {}, logical_table_id: {})", + storage->getDatabaseName(), + storage->getTableName(), + table_id, + logical_table_id)); throw; } } @@ -450,7 +747,7 @@ std::unordered_map DAG auto logical_table_storage = tmt.getStorages().get(logical_table_id); if (!logical_table_storage) { - throw TiFlashException("Table " + std::to_string(logical_table_id) + " doesn't exist.", Errors::Table::NotExists); + throw TiFlashException(fmt::format("Table {} doesn't exist.", logical_table_id), Errors::Table::NotExists); } storages_with_lock[logical_table_id] = {logical_table_storage, logical_table_storage->lockStructureForShare(context.getCurrentQueryId())}; if (table_scan.isPartitionTableScan()) @@ -460,7 +757,7 @@ std::unordered_map DAG auto physical_table_storage = tmt.getStorages().get(physical_table_id); if (!physical_table_storage) { - throw TiFlashException("Table " + std::to_string(physical_table_id) + " doesn't exist.", Errors::Table::NotExists); + throw TiFlashException(fmt::format("Table {} doesn't exist.", physical_table_id), Errors::Table::NotExists); } storages_with_lock[physical_table_id] = {physical_table_storage, physical_table_storage->lockStructureForShare(context.getCurrentQueryId())}; } @@ -479,16 +776,20 @@ std::unordered_map DAG if (!table_store) { if (schema_synced) - throw TiFlashException("Table " + std::to_string(table_id) + " doesn't exist.", Errors::Table::NotExists); + throw TiFlashException(fmt::format("Table {} doesn't exist.", table_id), Errors::Table::NotExists); else return {{}, {}, {}, false}; } if (table_store->engineType() != ::TiDB::StorageEngine::TMT && table_store->engineType() != ::TiDB::StorageEngine::DT) { - throw TiFlashException("Specifying schema_version for non-managed storage: " + table_store->getName() - + ", table: " + table_store->getTableName() + ", id: " + DB::toString(table_id) + " is not allowed", - Errors::Coprocessor::Internal); + throw TiFlashException( + fmt::format( + "Specifying schema_version for non-managed storage: {}, table: {}, id: {} is not allowed", + table_store->getName(), + table_store->getTableName(), + table_id), + Errors::Coprocessor::Internal); } auto lock = table_store->lockStructureForShare(context.getCurrentQueryId()); @@ -502,9 +803,9 @@ std::unordered_map DAG auto storage_schema_version = table_store->getTableInfo().schema_version; // Not allow storage > query in any case, one example is time travel queries. if (storage_schema_version > query_schema_version) - throw TiFlashException("Table " + std::to_string(table_id) + " schema version " + std::to_string(storage_schema_version) - + " newer than query schema version " + std::to_string(query_schema_version), - Errors::Table::SchemaVersionError); + throw TiFlashException( + fmt::format("Table {} schema version {} newer than query schema version {}", table_id, storage_schema_version, query_schema_version), + Errors::Table::SchemaVersionError); // From now on we have storage <= query. // If schema was synced, it implies that global >= query, as mentioned above we have storage <= query, we are OK to serve. if (schema_synced) @@ -605,10 +906,9 @@ std::tuple> DAGStorageIn // todo handle alias column if (max_columns_to_read && table_scan.getColumnSize() > max_columns_to_read) { - throw TiFlashException("Limit for number of columns to read exceeded. " - "Requested: " - + toString(table_scan.getColumnSize()) + ", maximum: " + toString(max_columns_to_read), - Errors::BroadcastJoin::TooManyColumns); + throw TiFlashException( + fmt::format("Limit for number of columns to read exceeded. Requested: {}, maximum: {}", table_scan.getColumnSize(), max_columns_to_read), + Errors::BroadcastJoin::TooManyColumns); } Names required_columns_tmp; @@ -672,7 +972,6 @@ void DAGStorageInterpreter::buildRemoteRequests() retry_regions_map[region_id_to_table_id_map[r.get().region_id]].emplace_back(r); } - for (const auto physical_table_id : table_scan.getPhysicalTableIDs()) { const auto & retry_regions = retry_regions_map[physical_table_id]; @@ -687,7 +986,7 @@ void DAGStorageInterpreter::buildRemoteRequests() *context.getDAGContext(), table_scan, storages_with_structure_lock[physical_table_id].storage->getTableInfo(), - query_block.selection, + push_down_filter, log)); } } diff --git a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h index d6760d8daab..a1d88083468 100644 --- a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h +++ b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h @@ -17,8 +17,7 @@ #include #include #include -#include -#include +#include #include #include #include @@ -50,9 +49,8 @@ class DAGStorageInterpreter public: DAGStorageInterpreter( Context & context_, - const DAGQueryBlock & query_block_, const TiDBTableScan & table_scan, - const std::vector & conditions_, + const PushDownFilter & push_down_filter_, size_t max_streams_); DAGStorageInterpreter(DAGStorageInterpreter &&) = delete; @@ -63,12 +61,6 @@ class DAGStorageInterpreter /// Members will be transfered to DAGQueryBlockInterpreter after execute std::unique_ptr analyzer; - std::vector is_need_add_cast_column; - /// it shouldn't be hash map because duplicated region id may occur if merge regions to retry of dag. - RegionRetryList region_retry_from_local_region; - TableLockHolders drop_locks; - std::vector remote_requests; - BlockInputStreamPtr null_stream_if_empty; private: struct StorageWithStructureLock @@ -92,12 +84,37 @@ class DAGStorageInterpreter std::unordered_map generateSelectQueryInfos(); + DAGContext & dagContext() const; + + void recordProfileStreams(DAGPipeline & pipeline, const String & key); + + void executeRemoteQuery(DAGPipeline & pipeline); + + void executeCastAfterTableScan( + size_t remote_read_streams_start_index, + DAGPipeline & pipeline); + + void executePushedDownFilter( + size_t remote_read_streams_start_index, + DAGPipeline & pipeline); + + void prepare(); + + void executeImpl(DAGPipeline & pipeline); + +private: + std::vector is_need_add_cast_column; + /// it shouldn't be hash map because duplicated region id may occur if merge regions to retry of dag. + RegionRetryList region_retry_from_local_region; + TableLockHolders drop_locks; + std::vector remote_requests; + BlockInputStreamPtr null_stream_if_empty; + /// passed from caller, doesn't change during DAGStorageInterpreter's lifetime Context & context; - const DAGQueryBlock & query_block; const TiDBTableScan & table_scan; - const std::vector & conditions; + const PushDownFilter & push_down_filter; size_t max_streams; LoggerPtr log; diff --git a/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp b/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp index 25ce6aa9a06..69060071997 100644 --- a/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp +++ b/dbms/src/Flash/Coprocessor/InterpreterUtils.cpp @@ -15,6 +15,7 @@ #include #include #include +#include namespace DB { @@ -79,4 +80,17 @@ void executeUnion( pipeline.streams.push_back(non_joined_data_stream); } } + +ExpressionActionsPtr generateProjectExpressionActions( + const BlockInputStreamPtr & stream, + const Context & context, + const NamesWithAliases & project_cols) +{ + NamesAndTypesList input_column; + for (const auto & column : stream->getHeader()) + input_column.emplace_back(column.name, column.type); + ExpressionActionsPtr project = std::make_shared(input_column, context.getSettingsRef()); + project->add(ExpressionAction::project(project_cols)); + return project; +} } // namespace DB diff --git a/dbms/src/Flash/Coprocessor/InterpreterUtils.h b/dbms/src/Flash/Coprocessor/InterpreterUtils.h index 932b8f404c5..91e6d483220 100644 --- a/dbms/src/Flash/Coprocessor/InterpreterUtils.h +++ b/dbms/src/Flash/Coprocessor/InterpreterUtils.h @@ -16,9 +16,12 @@ #include #include +#include namespace DB { +class Context; + void restoreConcurrency( DAGPipeline & pipeline, size_t concurrency, @@ -35,4 +38,9 @@ void executeUnion( size_t max_streams, const LoggerPtr & log, bool ignore_block = false); + +ExpressionActionsPtr generateProjectExpressionActions( + const BlockInputStreamPtr & stream, + const Context & context, + const NamesWithAliases & project_cols); } // namespace DB diff --git a/dbms/src/Flash/Coprocessor/PushDownFilter.cpp b/dbms/src/Flash/Coprocessor/PushDownFilter.cpp new file mode 100644 index 00000000000..54638f899b3 --- /dev/null +++ b/dbms/src/Flash/Coprocessor/PushDownFilter.cpp @@ -0,0 +1,65 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include + +namespace DB +{ +PushDownFilter::PushDownFilter( + const String & executor_id_, + const std::vector & conditions_) + : executor_id(executor_id_) + , conditions(conditions_) +{ + if (unlikely(conditions.empty() != executor_id.empty())) + { + throw TiFlashException( + "for PushDownFilter, conditions and executor_id should both be empty or neither should be empty", + Errors::Coprocessor::BadRequest); + } +} + +tipb::Executor * PushDownFilter::constructSelectionForRemoteRead(tipb::Executor * mutable_executor) const +{ + if (hasValue()) + { + mutable_executor->set_tp(tipb::ExecType::TypeSelection); + mutable_executor->set_executor_id(executor_id); + auto * new_selection = mutable_executor->mutable_selection(); + for (const auto & condition : conditions) + *new_selection->add_conditions() = *condition; + return new_selection->mutable_child(); + } + else + { + return mutable_executor; + } +} + +PushDownFilter PushDownFilter::toPushDownFilter(const tipb::Executor * filter_executor) +{ + if (!filter_executor || !filter_executor->has_selection()) + { + return {"", {}}; + } + + std::vector conditions; + for (const auto & condition : filter_executor->selection().conditions()) + conditions.push_back(&condition); + + return {filter_executor->executor_id(), conditions}; +} +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Flash/Coprocessor/PushDownFilter.h b/dbms/src/Flash/Coprocessor/PushDownFilter.h new file mode 100644 index 00000000000..0c461ef42e3 --- /dev/null +++ b/dbms/src/Flash/Coprocessor/PushDownFilter.h @@ -0,0 +1,39 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include + +#include + +namespace DB +{ +struct PushDownFilter +{ + static PushDownFilter toPushDownFilter(const tipb::Executor * filter_executor); + + PushDownFilter( + const String & executor_id_, + const std::vector & conditions_); + + bool hasValue() const { return !conditions.empty(); } + + tipb::Executor * constructSelectionForRemoteRead(tipb::Executor * mutable_executor) const; + + String executor_id; + std::vector conditions; +}; +} // namespace DB \ No newline at end of file diff --git a/dbms/src/Flash/Coprocessor/RemoteRequest.cpp b/dbms/src/Flash/Coprocessor/RemoteRequest.cpp index 0ef4a9de4d6..086cdb43d20 100644 --- a/dbms/src/Flash/Coprocessor/RemoteRequest.cpp +++ b/dbms/src/Flash/Coprocessor/RemoteRequest.cpp @@ -18,7 +18,13 @@ namespace DB { -RemoteRequest RemoteRequest::build(const RegionRetryList & retry_regions, DAGContext & dag_context, const TiDBTableScan & table_scan, const TiDB::TableInfo & table_info, const tipb::Executor * selection, LoggerPtr & log) +RemoteRequest RemoteRequest::build( + const RegionRetryList & retry_regions, + DAGContext & dag_context, + const TiDBTableScan & table_scan, + const TiDB::TableInfo & table_info, + const PushDownFilter & push_down_filter, + const LoggerPtr & log) { auto print_retry_regions = [&retry_regions, &table_info] { FmtBuffer buffer; @@ -35,16 +41,7 @@ RemoteRequest RemoteRequest::build(const RegionRetryList & retry_regions, DAGCon DAGSchema schema; tipb::DAGRequest dag_req; - auto * executor = dag_req.mutable_root_executor(); - if (selection != nullptr) - { - executor->set_tp(tipb::ExecType::TypeSelection); - executor->set_executor_id(selection->executor_id()); - auto * new_selection = executor->mutable_selection(); - for (const auto & condition : selection->selection().conditions()) - *new_selection->add_conditions() = condition; - executor = new_selection->mutable_child(); - } + auto * executor = push_down_filter.constructSelectionForRemoteRead(dag_req.mutable_root_executor()); { tipb::Executor * ts_exec = executor; diff --git a/dbms/src/Flash/Coprocessor/RemoteRequest.h b/dbms/src/Flash/Coprocessor/RemoteRequest.h index 37662ce36e2..1e42e18a7bd 100644 --- a/dbms/src/Flash/Coprocessor/RemoteRequest.h +++ b/dbms/src/Flash/Coprocessor/RemoteRequest.h @@ -14,6 +14,7 @@ #pragma once +#include #include #include #include @@ -34,7 +35,10 @@ using DAGSchema = std::vector; struct RemoteRequest { - RemoteRequest(tipb::DAGRequest && dag_request_, DAGSchema && schema_, std::vector && key_ranges_) + RemoteRequest( + tipb::DAGRequest && dag_request_, + DAGSchema && schema_, + std::vector && key_ranges_) : dag_request(std::move(dag_request_)) , schema(std::move(schema_)) , key_ranges(std::move(key_ranges_)) @@ -43,6 +47,12 @@ struct RemoteRequest DAGSchema schema; /// the sorted key ranges std::vector key_ranges; - static RemoteRequest build(const RegionRetryList & retry_regions, DAGContext & dag_context, const TiDBTableScan & table_scan, const TiDB::TableInfo & table_info, const tipb::Executor * selection, LoggerPtr & log); + static RemoteRequest build( + const RegionRetryList & retry_regions, + DAGContext & dag_context, + const TiDBTableScan & table_scan, + const TiDB::TableInfo & table_info, + const PushDownFilter & push_down_filter, + const LoggerPtr & log); }; } // namespace DB diff --git a/dbms/src/Flash/Coprocessor/TiDBTableScan.cpp b/dbms/src/Flash/Coprocessor/TiDBTableScan.cpp index cf19ac4efc1..7d7ad2f2b57 100644 --- a/dbms/src/Flash/Coprocessor/TiDBTableScan.cpp +++ b/dbms/src/Flash/Coprocessor/TiDBTableScan.cpp @@ -16,8 +16,12 @@ namespace DB { -TiDBTableScan::TiDBTableScan(const tipb::Executor * table_scan_, const DAGContext & dag_context) +TiDBTableScan::TiDBTableScan( + const tipb::Executor * table_scan_, + const String & executor_id_, + const DAGContext & dag_context) : table_scan(table_scan_) + , executor_id(executor_id_) , is_partition_table_scan(table_scan->tp() == tipb::TypePartitionTableScan) , columns(is_partition_table_scan ? table_scan->partition_table_scan().columns() : table_scan->tbl_scan().columns()) { diff --git a/dbms/src/Flash/Coprocessor/TiDBTableScan.h b/dbms/src/Flash/Coprocessor/TiDBTableScan.h index 31c145709a6..3c7703de7bf 100644 --- a/dbms/src/Flash/Coprocessor/TiDBTableScan.h +++ b/dbms/src/Flash/Coprocessor/TiDBTableScan.h @@ -24,7 +24,10 @@ namespace DB class TiDBTableScan { public: - TiDBTableScan(const tipb::Executor * table_scan_, const DAGContext & dag_context); + TiDBTableScan( + const tipb::Executor * table_scan_, + const String & executor_id_, + const DAGContext & dag_context); bool isPartitionTableScan() const { return is_partition_table_scan; @@ -48,11 +51,12 @@ class TiDBTableScan } String getTableScanExecutorID() const { - return table_scan->executor_id(); + return executor_id; } private: const tipb::Executor * table_scan; + String executor_id; bool is_partition_table_scan; const google::protobuf::RepeatedPtrField & columns; /// logical_table_id is the table id for a TiDB' table, while if the