Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#7469
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
Lloyd-Pottiger authored and ti-chi-bot committed May 16, 2023
1 parent 1234ed4 commit f6bd003
Show file tree
Hide file tree
Showing 9 changed files with 1,385 additions and 17 deletions.
167 changes: 152 additions & 15 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,106 @@ void DAGStorageInterpreter::execute(DAGPipeline & pipeline)
executeImpl(pipeline);
}

<<<<<<< HEAD
=======
SourceOps DAGStorageInterpreter::execute(PipelineExecutorStatus & exec_status)
{
prepare(); // learner read

return executeImpl(exec_status);
}

SourceOps DAGStorageInterpreter::executeImpl(PipelineExecutorStatus & exec_status)
{
auto & dag_context = dagContext();

auto scan_context = std::make_shared<DM::ScanContext>();
dag_context.scan_context_map[table_scan.getTableScanExecutorID()] = scan_context;
mvcc_query_info->scan_context = scan_context;

SourceOps source_ops;
if (!mvcc_query_info->regions_query_info.empty())
{
source_ops = buildLocalSourceOps(exec_status, context.getSettingsRef().max_block_size);
}

// Should build `remote_requests` and `nullSourceOp` under protect of `table_structure_lock`.
if (source_ops.empty())
{
source_ops.emplace_back(std::make_unique<NullSourceOp>(
exec_status,
storage_for_logical_table->getSampleBlockForColumns(required_columns),
log->identifier()));
}

// Note that `buildRemoteRequests` must be called after `buildLocalSourceOps` because
// `buildLocalSourceOps` will setup `region_retry_from_local_region` and we must
// retry those regions or there will be data lost.
auto remote_requests = buildRemoteRequests(scan_context);

if (dag_context.is_disaggregated_task && !remote_requests.empty())
{
// This means RN is sending requests with stale region info, we simply reject the request
// and ask RN to send requests again with correct region info. When RN updates region info,
// RN may be sending requests to other WN.

RegionException::UnavailableRegions region_ids;
for (const auto & info : context.getDAGContext()->retry_regions)
region_ids.insert(info.region_id);

throw RegionException(
std::move(region_ids),
RegionException::RegionReadStatus::EPOCH_NOT_MATCH);
}

FAIL_POINT_PAUSE(FailPoints::pause_with_alter_locks_acquired);

// Release alter locks
// The DeltaTree engine ensures that once sourceOps are created, the caller can get a consistent result
// from those sourceOps even if DDL operations are applied. Release the alter lock so that reading does not
// block DDL operations, keep the drop lock so that the storage not to be dropped during reading.
const TableLockHolders drop_locks = releaseAlterLocks();

remote_read_sources_start_index = source_ops.size();

if (!remote_requests.empty())
buildRemoteSourceOps(source_ops, exec_status, remote_requests);

for (const auto & lock : drop_locks)
dagContext().addTableLock(lock);

FAIL_POINT_PAUSE(FailPoints::pause_after_copr_streams_acquired);
FAIL_POINT_PAUSE(FailPoints::pause_after_copr_streams_acquired_once);

return source_ops;
}

void DAGStorageInterpreter::executeSuffix(PipelineExecutorStatus & exec_status, PipelineExecGroupBuilder & group_builder)
{
/// handle generated column if necessary.
executeGeneratedColumnPlaceholder(exec_status, group_builder, remote_read_sources_start_index, generated_column_infos, log);
NamesAndTypes source_columns;
source_columns.reserve(table_scan.getColumnSize());
const auto table_scan_output_header = group_builder.getCurrentHeader();
for (const auto & col : table_scan_output_header)
source_columns.emplace_back(col.name, col.type);
analyzer = std::make_unique<DAGExpressionAnalyzer>(std::move(source_columns), context);
/// If there is no local source, there is no need to execute cast and push down filter, return directly.
/// But we should make sure that the analyzer is initialized before return.
if (remote_read_sources_start_index == 0)
return;
/// handle timezone/duration cast for local table scan.
executeCastAfterTableScan(exec_status, group_builder, remote_read_sources_start_index);

/// handle filter conditions for local and remote table scan.
if (filter_conditions.hasValue())
{
::DB::executePushedDownFilter(exec_status, group_builder, remote_read_sources_start_index, filter_conditions, *analyzer, log);
/// TODO: record profile
}
}

>>>>>>> 698fdde3ff (Fix query fail when there are timestamp or time columns after generated column (#7469))
void DAGStorageInterpreter::executeImpl(DAGPipeline & pipeline)
{
if (!mvcc_query_info->regions_query_info.empty())
Expand Down Expand Up @@ -373,11 +473,25 @@ void DAGStorageInterpreter::executeImpl(DAGPipeline & pipeline)
setQuotaAndLimitsOnTableScan(context, pipeline);
FAIL_POINT_PAUSE(FailPoints::pause_after_copr_streams_acquired);
FAIL_POINT_PAUSE(FailPoints::pause_after_copr_streams_acquired_once);

/// handle timezone/duration cast for local and remote table scan.
executeCastAfterTableScan(remote_read_streams_start_index, pipeline);
/// handle generated column if necessary.
executeGeneratedColumnPlaceholder(remote_read_streams_start_index, generated_column_infos, log, pipeline);
NamesAndTypes source_columns;
source_columns.reserve(table_scan.getColumnSize());
const auto table_scan_output_header = pipeline.firstStream()->getHeader();
for (const auto & col : table_scan_output_header)
source_columns.emplace_back(col.name, col.type);
analyzer = std::make_unique<DAGExpressionAnalyzer>(std::move(source_columns), context);
/// If there is no local stream, there is no need to execute cast and push down filter, return directly.
/// But we should make sure that the analyzer is initialized before return.
if (remote_read_streams_start_index == 0)
{
recordProfileStreams(pipeline, table_scan.getTableScanExecutorID());
if (filter_conditions.hasValue())
recordProfileStreams(pipeline, filter_conditions.executor_id);
return;
}
/// handle timezone/duration cast for local and remote table scan.
executeCastAfterTableScan(remote_read_streams_start_index, pipeline);
recordProfileStreams(pipeline, table_scan.getTableScanExecutorID());

/// handle pushed down filter for local and remote table scan.
Expand Down Expand Up @@ -417,9 +531,13 @@ void DAGStorageInterpreter::prepare()
assert(storages_with_structure_lock.find(logical_table_id) != storages_with_structure_lock.end());
storage_for_logical_table = storages_with_structure_lock[logical_table_id].storage;

<<<<<<< HEAD
std::tie(required_columns, source_columns, is_need_add_cast_column) = getColumnsForTableScan(settings.max_columns_to_read);

analyzer = std::make_unique<DAGExpressionAnalyzer>(std::move(source_columns), context);
=======
std::tie(required_columns, is_need_add_cast_column) = getColumnsForTableScan();
>>>>>>> 698fdde3ff (Fix query fail when there are timestamp or time columns after generated column (#7469))
}

std::tuple<ExpressionActionsPtr, String, ExpressionActionsPtr> DAGStorageInterpreter::buildPushDownFilter()
Expand Down Expand Up @@ -1023,7 +1141,11 @@ std::unordered_map<TableID, DAGStorageInterpreter::StorageWithStructureLock> DAG
return storages_with_lock;
}

<<<<<<< HEAD
std::tuple<Names, NamesAndTypes, std::vector<ExtraCastAfterTSMode>> DAGStorageInterpreter::getColumnsForTableScan(Int64 max_columns_to_read)
=======
std::tuple<Names, std::vector<ExtraCastAfterTSMode>> DAGStorageInterpreter::getColumnsForTableScan()
>>>>>>> 698fdde3ff (Fix query fail when there are timestamp or time columns after generated column (#7469))
{
// todo handle alias column
if (max_columns_to_read && table_scan.getColumnSize() > max_columns_to_read)
Expand All @@ -1034,7 +1156,11 @@ std::tuple<Names, NamesAndTypes, std::vector<ExtraCastAfterTSMode>> DAGStorageIn
}

Names required_columns_tmp;
<<<<<<< HEAD
NamesAndTypes source_columns_tmp;
=======
required_columns_tmp.reserve(table_scan.getColumnSize());
>>>>>>> 698fdde3ff (Fix query fail when there are timestamp or time columns after generated column (#7469))
std::vector<ExtraCastAfterTSMode> need_cast_column;
need_cast_column.reserve(table_scan.getColumnSize());
String handle_column_name = MutableSupport::tidb_pk_column_name;
Expand All @@ -1053,7 +1179,6 @@ std::tuple<Names, NamesAndTypes, std::vector<ExtraCastAfterTSMode>> DAGStorageIn
const auto & data_type = getDataTypeByColumnInfoForComputingLayer(tidb_ci);
const auto & col_name = GeneratedColumnPlaceholderBlockInputStream::getColumnName(i);
generated_column_infos.push_back(std::make_tuple(i, col_name, data_type));
source_columns_tmp.emplace_back(NameAndTypePair{col_name, data_type});
continue;
}
// Column ID -1 return the handle column
Expand All @@ -1064,26 +1189,38 @@ std::tuple<Names, NamesAndTypes, std::vector<ExtraCastAfterTSMode>> DAGStorageIn
name = MutableSupport::extra_table_id_column_name;
else
name = storage_for_logical_table->getTableInfo().getColumnName(cid);
if (cid == ExtraTableIDColumnID)
{
NameAndTypePair extra_table_id_column_pair = {name, MutableSupport::extra_table_id_column_type};
source_columns_tmp.emplace_back(std::move(extra_table_id_column_pair));
}
else
{
auto pair = storage_for_logical_table->getColumns().getPhysical(name);
source_columns_tmp.emplace_back(std::move(pair));
}
required_columns_tmp.emplace_back(std::move(name));
<<<<<<< HEAD
if (cid != -1 && ci.tp() == TiDB::TypeTimestamp)
need_cast_column.push_back(ExtraCastAfterTSMode::AppendTimeZoneCast);
else if (cid != -1 && ci.tp() == TiDB::TypeTime)
need_cast_column.push_back(ExtraCastAfterTSMode::AppendDurationCast);
=======
}

std::unordered_set<ColumnID> col_id_set;
for (const auto & expr : table_scan.getPushedDownFilters())
{
getColumnIDsFromExpr(expr, table_scan.getColumns(), col_id_set);
}
for (const auto & col : table_scan.getColumns())
{
if (col.hasGeneratedColumnFlag())
{
need_cast_column.push_back(ExtraCastAfterTSMode::None);
continue;
}

if (col_id_set.contains(col.id))
{
need_cast_column.push_back(ExtraCastAfterTSMode::None);
}
>>>>>>> 698fdde3ff (Fix query fail when there are timestamp or time columns after generated column (#7469))
else
need_cast_column.push_back(ExtraCastAfterTSMode::None);
}

return {required_columns_tmp, source_columns_tmp, need_cast_column};
return {required_columns_tmp, need_cast_column};
}

// Build remote requests from `region_retry_from_local_region` and `table_regions_info.remote_regions`
Expand Down
5 changes: 4 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGStorageInterpreter.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,11 @@ class DAGStorageInterpreter

std::unordered_map<TableID, StorageWithStructureLock> getAndLockStorages(Int64 query_schema_version);

<<<<<<< HEAD
std::tuple<Names, NamesAndTypes, std::vector<ExtraCastAfterTSMode>> getColumnsForTableScan(Int64 max_columns_to_read);
=======
std::tuple<Names, std::vector<ExtraCastAfterTSMode>> getColumnsForTableScan();
>>>>>>> 698fdde3ff (Fix query fail when there are timestamp or time columns after generated column (#7469))

std::vector<RemoteRequest> buildRemoteRequests();

Expand Down Expand Up @@ -140,7 +144,6 @@ class DAGStorageInterpreter
std::unordered_map<TableID, StorageWithStructureLock> storages_with_structure_lock;
ManageableStoragePtr storage_for_logical_table;
Names required_columns;
NamesAndTypes source_columns;
// For generated column, just need a placeholder, and TiDB will fill this column.
std::vector<std::tuple<UInt64, String, DataTypePtr>> generated_column_infos;
};
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1024,7 +1024,7 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context,
{
res.emplace_back(std::make_shared<UnorderedInputStream>(
read_task_pool,
columns_to_read,
filter && filter->extra_cast ? *filter->columns_after_cast : columns_to_read,
extra_table_id_index,
physical_table_id,
log_tracing_id));
Expand Down
62 changes: 62 additions & 0 deletions dbms/src/Storages/DeltaMerge/Filter/PushDownFilter.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Interpreters/ExpressionActions.h>
#include <Storages/DeltaMerge/Filter/RSOperator.h>

namespace DB::DM
{

class PushDownFilter;
using PushDownFilterPtr = std::shared_ptr<PushDownFilter>;
inline static const PushDownFilterPtr EMPTY_FILTER{};

class PushDownFilter : public std::enable_shared_from_this<PushDownFilter>
{
public:
PushDownFilter(const RSOperatorPtr & rs_operator_,
const ExpressionActionsPtr & beofre_where_,
const ColumnDefines & filter_columns_,
const String filter_column_name_,
const ExpressionActionsPtr & extra_cast_,
const ColumnDefinesPtr & columns_after_cast_)
: rs_operator(rs_operator_)
, before_where(beofre_where_)
, filter_column_name(std::move(filter_column_name_))
, filter_columns(std::move(filter_columns_))
, extra_cast(extra_cast_)
, columns_after_cast(columns_after_cast_)
{}

explicit PushDownFilter(const RSOperatorPtr & rs_operator_)
: rs_operator(rs_operator_)
{}

// Rough set operator
RSOperatorPtr rs_operator;
// Filter expression actions and the name of the tmp filter column
// Used construct the FilterBlockInputStream
ExpressionActionsPtr before_where;
String filter_column_name;
// The columns needed by the filter expression
ColumnDefines filter_columns;
// The expression actions used to cast the timestamp/datetime column
ExpressionActionsPtr extra_cast;
// If the extra_cast is not null, the types of the columns may be changed
ColumnDefinesPtr columns_after_cast;
};

} // namespace DB::DM
Loading

0 comments on commit f6bd003

Please sign in to comment.